quantranhong1999 commented on code in PR #2028:
URL: https://github.com/apache/james-project/pull/2028#discussion_r1497433283
##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
}
public void start() {
- scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
- channelPool.createWorkQueue(
- QueueSpecification.queue(queueName.asString())
- .durable(DURABLE)
- .exclusive(!EXCLUSIVE)
- .autoDelete(!AUTO_DELETE)
-
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
- .deadLetter(namingStrategy.deadLetterExchange())
- .build()),
- BindingSpecification.binding()
- .exchange(namingStrategy.exchange())
- .queue(queueName.asString())
- .routingKey(EMPTY_ROUTING_KEY))
+ scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE,
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create
thread pool for Event bus
+
+ redisStreamReactiveCommands
+ .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+ CONSUMER_GROUP_NAME,
+ XGroupCreateArgs.Builder.mkstream()) // create Redis stream if
not exist
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(),
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
.block();
- this.consumer = Optional.of(consumeWorkQueue());
+ this.consumer = Optional.of(consumeRedisStream());
}
- private Disposable consumeWorkQueue() {
- return Flux.using(
- receiverProvider::createReceiver,
- receiver -> receiver.consumeManualAck(queueName.asString(), new
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
- Receiver::close)
- .filter(delivery -> Objects.nonNull(delivery.getBody()))
- .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+ public Disposable consumeRedisStream() {
+ return Flux.interval(Duration.ofSeconds(1)) // pooling for new
messages every 1 second. not sure it would be a good idea. Anyway just start
simple...
Review Comment:
Ok, I will focus on the "key" impl with Redis.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]