This is an automated email from the ASF dual-hosted git repository.
hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb138d78f JAMES-4165 Deprioritize non critical groups upon event dead
letter events redelivery (#2917)
1bb138d78f is described below
commit 1bb138d78f770bef1fbae975bd36e1ac42f37113
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Thu Jan 22 13:36:35 2026 +0700
JAMES-4165 Deprioritize non critical groups upon event dead letter events
redelivery (#2917)
---
.../WebadminMailboxTaskSerializationModule.java | 10 ++-
.../service/EventDeadLettersRedeliverAllTask.java | 6 +-
.../EventDeadLettersRedeliverAllTaskDTO.java | 7 +-
.../webadmin/service/EventDeadLettersService.java | 11 ++-
.../james/webadmin/service/EventRetriever.java | 17 +++-
.../routes/EventDeadLettersRoutesTest.java | 4 +-
.../service/EventDeadLettersRedeliverTaskTest.java | 18 +++--
.../james/webadmin/service/EventRetrieverTest.java | 91 ++++++++++++++++++++++
8 files changed, 144 insertions(+), 20 deletions(-)
diff --git
a/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
b/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
index 0e6920e734..9a5d87b0a5 100644
---
a/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
+++
b/server/container/guice/protocols/webadmin-mailbox/src/main/java/org/apache/james/modules/server/WebadminMailboxTaskSerializationModule.java
@@ -18,6 +18,11 @@
****************************************************************/
package org.apache.james.modules.server;
+import static
org.apache.james.events.EventDeadLettersHealthCheck.DEAD_LETTERS_IGNORED_GROUPS;
+
+import java.util.Set;
+
+import org.apache.james.events.Group;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.SubscriptionManager;
import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService;
@@ -52,8 +57,9 @@ import com.google.inject.name.Named;
public class WebadminMailboxTaskSerializationModule extends AbstractModule {
@ProvidesIntoSet
- public TaskDTOModule<? extends Task, ? extends TaskDTO>
eventDeadLettersRedeliverAllTask(EventDeadLettersRedeliverService service) {
- return EventDeadLettersRedeliverAllTaskDTO.module(service);
+ public TaskDTOModule<? extends Task, ? extends TaskDTO>
eventDeadLettersRedeliverAllTask(EventDeadLettersRedeliverService service,
+
@Named(DEAD_LETTERS_IGNORED_GROUPS) Set<Group> nonCriticalGroups)
{
+ return EventDeadLettersRedeliverAllTaskDTO.module(service,
nonCriticalGroups);
}
@ProvidesIntoSet
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTask.java
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTask.java
index 4c9e201554..544ef01534 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTask.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTask.java
@@ -23,8 +23,10 @@ import static
org.apache.james.webadmin.service.EventDeadLettersRedeliverService
import java.time.Clock;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.james.events.Group;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
@@ -39,9 +41,9 @@ public class EventDeadLettersRedeliverAllTask implements Task
{
private final AtomicLong failedRedeliveriesCount;
private final RunningOptions runningOptions;
- EventDeadLettersRedeliverAllTask(EventDeadLettersRedeliverService service,
RunningOptions runningOptions) {
+ EventDeadLettersRedeliverAllTask(EventDeadLettersRedeliverService service,
RunningOptions runningOptions, Set<Group> nonCriticalGroups) {
this.service = service;
- this.eventRetriever = EventRetriever.allEvents();
+ this.eventRetriever = EventRetriever.allEvents(nonCriticalGroups);
this.successfulRedeliveriesCount = new AtomicLong(0L);
this.failedRedeliveriesCount = new AtomicLong(0L);
this.runningOptions = runningOptions;
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTaskDTO.java
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTaskDTO.java
index ab6968251d..bffafc7069 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTaskDTO.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverAllTaskDTO.java
@@ -21,7 +21,9 @@ package org.apache.james.webadmin.service;
import static
org.apache.james.webadmin.service.EventDeadLettersRedeliverService.RunningOptions;
import java.util.Optional;
+import java.util.Set;
+import org.apache.james.events.Group;
import org.apache.james.json.DTOModule;
import org.apache.james.server.task.json.dto.TaskDTO;
import org.apache.james.server.task.json.dto.TaskDTOModule;
@@ -30,11 +32,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class EventDeadLettersRedeliverAllTaskDTO implements TaskDTO {
- public static TaskDTOModule<EventDeadLettersRedeliverAllTask,
EventDeadLettersRedeliverAllTaskDTO> module(EventDeadLettersRedeliverService
service) {
+ public static TaskDTOModule<EventDeadLettersRedeliverAllTask,
EventDeadLettersRedeliverAllTaskDTO> module(EventDeadLettersRedeliverService
service,
+
Set<Group> nonCriticalGroups) {
return DTOModule
.forDomainObject(EventDeadLettersRedeliverAllTask.class)
.convertToDTO(EventDeadLettersRedeliverAllTaskDTO.class)
- .toDomainObjectConverter(dto -> new
EventDeadLettersRedeliverAllTask(service, dto.getRunningOptions()))
+ .toDomainObjectConverter(dto -> new
EventDeadLettersRedeliverAllTask(service, dto.getRunningOptions(),
nonCriticalGroups))
.toDTOConverter((domainObject, typeName) -> new
EventDeadLettersRedeliverAllTaskDTO(typeName, domainObject.getRunningOptions()))
.typeName(EventDeadLettersRedeliverAllTask.TYPE.asString())
.withFactory(TaskDTOModule::new);
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index 7eb6f460be..ab41e22329 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -20,12 +20,15 @@
package org.apache.james.webadmin.service;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import jakarta.inject.Inject;
+import jakarta.inject.Named;
import org.apache.james.events.Event;
import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.EventDeadLettersHealthCheck;
import org.apache.james.events.Group;
import org.apache.james.task.Task;
@@ -37,12 +40,16 @@ import reactor.core.publisher.Mono;
public class EventDeadLettersService {
private final EventDeadLettersRedeliverService redeliverService;
private final EventDeadLetters deadLetters;
+ private final Set<Group> nonCriticalGroups;
@Inject
@VisibleForTesting
- public EventDeadLettersService(EventDeadLettersRedeliverService
redeliverService, EventDeadLetters deadLetters) {
+ public EventDeadLettersService(EventDeadLettersRedeliverService
redeliverService,
+ EventDeadLetters deadLetters,
+
@Named(EventDeadLettersHealthCheck.DEAD_LETTERS_IGNORED_GROUPS) Set<Group>
nonCriticalGroups) {
this.redeliverService = redeliverService;
this.deadLetters = deadLetters;
+ this.nonCriticalGroups = nonCriticalGroups;
}
public List<String> listGroupsAsStrings() {
@@ -73,7 +80,7 @@ public class EventDeadLettersService {
}
public Task
redeliverAllEvents(EventDeadLettersRedeliverService.RunningOptions
runningOptions) {
- return new EventDeadLettersRedeliverAllTask(redeliverService,
runningOptions);
+ return new EventDeadLettersRedeliverAllTask(redeliverService,
runningOptions, nonCriticalGroups);
}
public Task redeliverGroupEvents(Group group,
EventDeadLettersRedeliverService.RunningOptions runningOptions) {
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
index 4d66edbb03..208ffc9c53 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
@@ -21,7 +21,9 @@ package org.apache.james.webadmin.service;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+import java.util.Comparator;
import java.util.Optional;
+import java.util.Set;
import org.apache.james.events.Event;
import org.apache.james.events.EventDeadLetters;
@@ -32,8 +34,8 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuple3;
public interface EventRetriever {
- static EventRetriever allEvents() {
- return new AllEventsRetriever();
+ static EventRetriever allEvents(Set<Group> nonCriticalGroups) {
+ return new AllEventsRetriever(nonCriticalGroups);
}
static EventRetriever groupEvents(Group group) {
@@ -56,6 +58,12 @@ public interface EventRetriever {
}
class AllEventsRetriever implements EventRetriever {
+ private final Set<Group> nonCriticalGroups;
+
+ AllEventsRetriever(Set<Group> nonCriticalGroups) {
+ this.nonCriticalGroups = nonCriticalGroups;
+ }
+
@Override
public Optional<Group> forGroup() {
return Optional.empty();
@@ -69,7 +77,10 @@ public interface EventRetriever {
@Override
public Flux<Tuple3<Group, Event, EventDeadLetters.InsertionId>>
retrieveEvents(EventDeadLetters deadLetters) {
return deadLetters.groupsWithFailedEvents()
- .flatMap(group -> listGroupEvents(deadLetters, group),
DEFAULT_CONCURRENCY);
+ .collectList()
+ .flatMapMany(groups -> Flux.fromIterable(groups)
+
.sort(Comparator.comparing(this.nonCriticalGroups::contains))
+ .flatMapSequential(group -> listGroupEvents(deadLetters,
group), DEFAULT_CONCURRENCY));
}
}
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index c6f34ae059..29c651e5ed 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -31,6 +31,8 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import java.util.Set;
+
import org.apache.james.core.Username;
import org.apache.james.event.json.MailboxEventSerializer;
import org.apache.james.events.Event;
@@ -127,7 +129,7 @@ class EventDeadLettersRoutesTest {
eventBus1 = new InVMEventBus(new InVmEventDelivery(new
RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
eventBus2 = new InVMEventBus(new InVmEventDelivery(new
RecordingMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
EventDeadLettersRedeliverService redeliverService = new
EventDeadLettersRedeliverService(ImmutableSet.of(eventBus1, eventBus2),
deadLetters);
- EventDeadLettersService service = new
EventDeadLettersService(redeliverService, deadLetters);
+ EventDeadLettersService service = new
EventDeadLettersService(redeliverService, deadLetters, Set.of());
taskManager = new MemoryTaskManager(new Hostname("foo"));
webAdminServer = WebAdminUtils.createWebAdminServer(
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTaskTest.java
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTaskTest.java
index 48f0d2cc99..8acb16721c 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTaskTest.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTaskTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import java.time.Instant;
import java.util.Optional;
+import java.util.Set;
import org.apache.james.JsonSerializationVerifier;
import org.apache.james.events.EventDeadLetters;
@@ -41,7 +42,8 @@ import org.junit.jupiter.api.Test;
class EventDeadLettersRedeliverTaskTest {
private static final Instant TIMESTAMP =
Instant.parse("2018-11-13T12:00:55Z");
private static final EventDeadLettersRedeliverService SERVICE =
mock(EventDeadLettersRedeliverService.class);
- private static final EventDeadLettersRedeliverAllTask TASK_ALL = new
EventDeadLettersRedeliverAllTask(SERVICE,
EventDeadLettersRedeliverService.RunningOptions.DEFAULT);
+ private static final Set<Group> NON_CRITICAL_GROUPS = Set.of();
+ private static final EventDeadLettersRedeliverAllTask TASK_ALL = new
EventDeadLettersRedeliverAllTask(SERVICE,
EventDeadLettersRedeliverService.RunningOptions.DEFAULT, NON_CRITICAL_GROUPS);
private static final EventDeadLettersRedeliverGroupTask TASK_GROUP = new
EventDeadLettersRedeliverGroupTask(SERVICE, new GenericGroup("abc"),
EventDeadLettersRedeliverService.RunningOptions.DEFAULT);
private static final EventDeadLettersRedeliverOneTask TASK_ONE = new
EventDeadLettersRedeliverOneTask(SERVICE, new GenericGroup("abc"),
EventDeadLetters.InsertionId.of("fcbc3c92-e9a0-4ece-94ed-6e6b45045258"));
@@ -58,7 +60,7 @@ class EventDeadLettersRedeliverTaskTest {
@Test
void redeliverAllTaskShouldMatchJsonSerializationContract() throws
Exception {
-
JsonSerializationVerifier.dtoModule(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE))
+
JsonSerializationVerifier.dtoModule(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE,
NON_CRITICAL_GROUPS))
.bean(TASK_ALL)
.json("{" +
" \"type\": \"event-dead-letters-redeliver-all\"," +
@@ -66,9 +68,9 @@ class EventDeadLettersRedeliverTaskTest {
"}")
.verify();
- EventDeadLettersRedeliverAllTask taskAllWithLimit = new
EventDeadLettersRedeliverAllTask(SERVICE, new RunningOptions(Limit.limit(10)));
+ EventDeadLettersRedeliverAllTask taskAllWithLimit = new
EventDeadLettersRedeliverAllTask(SERVICE, new RunningOptions(Limit.limit(10)),
NON_CRITICAL_GROUPS);
-
JsonSerializationVerifier.dtoModule(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE))
+
JsonSerializationVerifier.dtoModule(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE,
NON_CRITICAL_GROUPS))
.bean(taskAllWithLimit)
.json("{\"type\":\"event-dead-letters-redeliver-all\",
\"runningOptions\":{\"limit\": 10}}")
.verify();
@@ -76,19 +78,19 @@ class EventDeadLettersRedeliverTaskTest {
@Test
void redeliverAllTaskShouldDeserializationSuccess() throws Exception {
- JsonTaskSerializer serializer =
JsonTaskSerializer.of(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE));
+ JsonTaskSerializer serializer =
JsonTaskSerializer.of(EventDeadLettersRedeliverAllTaskDTO.module(SERVICE,
NON_CRITICAL_GROUPS));
assertThat(serializer.deserialize("{\"type\":\"event-dead-letters-redeliver-all\",
\"runningOptions\":{\"limit\": 10}}"))
.usingRecursiveComparison()
- .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.limit(10))));
+ .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.limit(10)), NON_CRITICAL_GROUPS));
assertThat(serializer.deserialize("{\"type\":\"event-dead-letters-redeliver-all\",
\"runningOptions\":{}}"))
.usingRecursiveComparison()
- .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.unlimited())));
+ .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.unlimited()), NON_CRITICAL_GROUPS));
assertThat(serializer.deserialize("{\"type\":\"event-dead-letters-redeliver-all\"}"))
.usingRecursiveComparison()
- .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.unlimited())));
+ .isEqualTo(new EventDeadLettersRedeliverAllTask(SERVICE, new
RunningOptions(Limit.unlimited()), NON_CRITICAL_GROUPS));
}
@Test
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventRetrieverTest.java
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventRetrieverTest.java
new file mode 100644
index 0000000000..8a17ee92dc
--- /dev/null
+++
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/service/EventRetrieverTest.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.james.core.Username;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.Group;
+import org.apache.james.mailbox.events.GenericGroup;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+class EventRetrieverTest {
+ private static class TestEvent implements Event {
+ private final EventId eventId;
+
+ private TestEvent(String id) {
+ this.eventId =
EventId.of(UUID.nameUUIDFromBytes(id.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Override
+ public Username getUsername() {
+ return Username.of("user");
+ }
+
+ @Override
+ public boolean isNoop() {
+ return false;
+ }
+
+ @Override
+ public EventId getEventId() {
+ return eventId;
+ }
+ }
+
+ @Test
+ void allEventsRetrieverShouldDeprioritizeNonCriticalGroups() {
+ EventDeadLetters deadLetters = mock(EventDeadLetters.class);
+ Group nonCriticalGroup = new GenericGroup("non-critical");
+ Group criticalGroup = new GenericGroup("critical");
+ EventDeadLetters.InsertionId nonCriticalId =
EventDeadLetters.InsertionId.of(UUID.fromString("71fd8b41-a2bb-4a95-9d45-78f359bc058f"));
+ EventDeadLetters.InsertionId criticalId =
EventDeadLetters.InsertionId.of(UUID.fromString("fd951fe8-84ed-4af7-8340-f854321b76ad"));
+
+
when(deadLetters.groupsWithFailedEvents()).thenReturn(Flux.just(nonCriticalGroup,
criticalGroup));
+
when(deadLetters.failedIds(nonCriticalGroup)).thenReturn(Flux.just(nonCriticalId));
+
when(deadLetters.failedIds(criticalGroup)).thenReturn(Flux.just(criticalId));
+ when(deadLetters.failedEvent(nonCriticalGroup,
nonCriticalId)).thenReturn(Mono.just(new TestEvent("ignored")));
+ when(deadLetters.failedEvent(criticalGroup,
criticalId)).thenReturn(Mono.just(new TestEvent("normal")));
+
+ Set<Group> nonCriticalGroups = Set.of(nonCriticalGroup);
+ EventRetriever eventRetriever =
EventRetriever.allEvents(nonCriticalGroups);
+
+ List<Group> retrievedGroups =
eventRetriever.retrieveEvents(deadLetters)
+ .map(Tuple2::getT1)
+ .collectList()
+ .block();
+
+ // should return non-critical groups last
+ assertThat(retrievedGroups).containsExactly(criticalGroup,
nonCriticalGroup);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]