quantranhong1999 commented on code in PR #2028:
URL: https://github.com/apache/james-project/pull/2028#discussion_r1497367581
##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
}
public void start() {
- scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
- channelPool.createWorkQueue(
- QueueSpecification.queue(queueName.asString())
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
-
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
- .deadLetter(namingStrategy.deadLetterExchange())
- .build()),
- BindingSpecification.binding()
- .exchange(namingStrategy.exchange())
- .queue(queueName.asString())
- .routingKey(EMPTY_ROUTING_KEY))
+ scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create
thread pool for Event bus
+
+ redisStreamReactiveCommands
+ .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+ CONSUMER_GROUP_NAME,
+ XGroupCreateArgs.Builder.mkstream()) // create Redis stream if
not exist
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(),
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
.block();
- this.consumer = Optional.of(consumeWorkQueue());
+ this.consumer = Optional.of(consumeRedisStream());
}
- private Disposable consumeWorkQueue() {
- return Flux.using(
- receiverProvider::createReceiver,
- receiver -> receiver.consumeManualAck(queueName.asString(), new
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
- Receiver::close)
- .filter(delivery -> Objects.nonNull(delivery.getBody()))
- .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+ public Disposable consumeRedisStream() {
+ return Flux.interval(Duration.ofSeconds(1)) // pooling for new
messages every 1 second. not sure it would be a good idea. Anyway just start
simple...
Review Comment:
Maybe using `repeat` would be a better idea
##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
}
public void start() {
- scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
- channelPool.createWorkQueue(
- QueueSpecification.queue(queueName.asString())
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
-
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
- .deadLetter(namingStrategy.deadLetterExchange())
- .build()),
- BindingSpecification.binding()
- .exchange(namingStrategy.exchange())
- .queue(queueName.asString())
- .routingKey(EMPTY_ROUTING_KEY))
+ scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create
thread pool for Event bus
+
+ redisStreamReactiveCommands
+ .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+ CONSUMER_GROUP_NAME,
+ XGroupCreateArgs.Builder.mkstream()) // create Redis stream if
not exist
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(),
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
.block();
- this.consumer = Optional.of(consumeWorkQueue());
+ this.consumer = Optional.of(consumeRedisStream());
}
- private Disposable consumeWorkQueue() {
- return Flux.using(
- receiverProvider::createReceiver,
- receiver -> receiver.consumeManualAck(queueName.asString(), new
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
- Receiver::close)
- .filter(delivery -> Objects.nonNull(delivery.getBody()))
- .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+ public Disposable consumeRedisStream() {
+ return Flux.interval(Duration.ofSeconds(1)) // pooling for new
messages every 1 second. not sure it would be a good idea. Anyway just start
simple...
+ .flatMap(ignore -> redisStreamReactiveCommands.xreadgroup(
+ Consumer.from(CONSUMER_GROUP_NAME, consumerName),
+ XReadArgs.Builder.block(0), // Waiting unlimited time for new
messages to avoid too much pooling
Review Comment:
maybe limiting the count of messages would prevent a consumer from being
overwhelmed upon too many messages published in a short time, and distribute
the load better between consumers.
But I am not sure if the command would be blocked util received fully e.g.
COUNT 10 messages yet. (need to test)
If is not waiting for the full 10 messages, then it is good to use COUNT.
Or change BLOCK to a non-forever duration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]