[
https://issues.apache.org/jira/browse/JAMES-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18043310#comment-18043310
]
Benoit Tellier commented on JAMES-4154:
---------------------------------------
Hi [~matthieu]
> Maybe you could point us to a design documentation about how Event Bus works
> right now and what you mean by "empty registration key"?
Means "pub-sub" for eg WS push, IMAP session, etc.
See:
- https://github.com/apache/james-project/blob/master/src/adr/0037-eventbus.md
- https://github.com/apache/james-project/blob/master/src/adr/0037-eventbus.md
> Whether James Event Bus meets the requirements about error handling and so on
> is not clear to me, but it could be extended anyway.
It perfectly fits.
We would just need a minor enhacement so that initial delivery is time-bound
(all listeners are execued at once to reduce the chatter) so that a slow
listener do not impact all. But it is minor.
> Tasks
Well that's unfortunate, tasks are :
- truelly boiler plate
- running on an expclusive queue
- Not good at defining sub units of work
We would need to span from a few hundred message (common case is ~10) to
hundreds. I'm rather reluctant to use tasks here.
Current code is closer to the event bus (small events, potentially many of
them) so I would rather keep the code that way, as it is already battle tested.
> Generalize RabbitMQ work queue code used for async deletion callback(s)
> -----------------------------------------------------------------------
>
> Key: JAMES-4154
> URL: https://issues.apache.org/jira/browse/JAMES-4154
> Project: James Server
> Issue Type: Improvement
> Components: deletedMessageVault, Queue
> Affects Versions: master
> Reporter: Tran Hong Quan
> Priority: Minor
>
> h2. Why
> Today, we have the deleted message vault that handles deleted message
> callback asynchronously, relying on the RabbitMQ work queue to avoid timeout
> consuming e.g. DTM for deletion of a mailbox having 1 million emails.
> We may want to have other async deletion callback(s) that could rely on
> RabbiMQ work queue too.
> We should find a way to mutualize the shared RabbitMQ work queue setup, and
> plug different callback(s) easily.
> h2. How
> - Introduce `AsyncDeletionCallback` interface
> ```java
> public interface AsyncDeletionCallback extends
> DeleteMessageListener.DeletionCallback {
> }
> ```
> - Implement `AggregatedAsyncDeletionCallback` that uses the RabbitMQ work
> queue code similar to
> current`DistributedDeletedMessageVaultDeletionCallback`, but takes an
> `AsyncDeletionCallback` set as an argument.
> The idea is to share the same RabbitMQ work queue e.g.
> `async-deletion-work-queue`, to handle all the async deletion callback(s).
> ```java
> private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
> try {
> CopyCommandDTO copyCommandDTO =
> objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
> return Flux.fromIterable(asyncDeletionCallbacks)
> .flatMap(callback ->
> callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory,
> blobIdFactory)))
> .then()
> .timeout(Duration.ofMinutes(5))
> .doOnSuccess(any -> delivery.ack())
> .doOnCancel(() -> delivery.nack(REQUEUE))
> .onErrorResume(e -> {
> LOGGER.error("Failed executing async deletion callbacks
> for {}", copyCommandDTO.messageId, e);
> delivery.nack(REQUEUE);
> return Mono.empty();
> });
> ```
> - Refactor `DistributedDeletedMessageVaultDeletionCallback`: strip all the
> rabbitmq code, just implement `AsyncDeletionCallback`
> - Refactor `DeletedMessageVaultWorkQueueReconnectionHandler` so it handles
> reconnection for `AggregatedAsyncDeletionCallback` instead.
> - Guice binding to plug DTM as an `AsyncDeletionCallback`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]