quantranhong1999 commented on code in PR #2028:
URL: https://github.com/apache/james-project/pull/2028#discussion_r1497367581


##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
     }
 
     public void start() {
-        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
-        channelPool.createWorkQueue(
-            QueueSpecification.queue(queueName.asString())
-                .durable(DURABLE)
-                .exclusive(!EXCLUSIVE)
-                .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
-                    .deadLetter(namingStrategy.deadLetterExchange())
-                    .build()),
-            BindingSpecification.binding()
-                .exchange(namingStrategy.exchange())
-                .queue(queueName.asString())
-                .routingKey(EMPTY_ROUTING_KEY))
+        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create 
thread pool for Event bus
+
+        redisStreamReactiveCommands
+            .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+                CONSUMER_GROUP_NAME,
+                XGroupCreateArgs.Builder.mkstream()) // create Redis stream if 
not exist
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
             .block();
 
-        this.consumer = Optional.of(consumeWorkQueue());
+        this.consumer = Optional.of(consumeRedisStream());
     }
 
-    private Disposable consumeWorkQueue() {
-        return Flux.using(
-                receiverProvider::createReceiver,
-            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
-            Receiver::close)
-            .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+    public Disposable consumeRedisStream() {
+        return Flux.interval(Duration.ofSeconds(1)) // pooling for new 
messages every 1 second. not sure it would be a good idea. Anyway just start 
simple...

Review Comment:
   Maybe using `repeat` would be a better idea



##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
     }
 
     public void start() {
-        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
-        channelPool.createWorkQueue(
-            QueueSpecification.queue(queueName.asString())
-                .durable(DURABLE)
-                .exclusive(!EXCLUSIVE)
-                .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
-                    .deadLetter(namingStrategy.deadLetterExchange())
-                    .build()),
-            BindingSpecification.binding()
-                .exchange(namingStrategy.exchange())
-                .queue(queueName.asString())
-                .routingKey(EMPTY_ROUTING_KEY))
+        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create 
thread pool for Event bus
+
+        redisStreamReactiveCommands
+            .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+                CONSUMER_GROUP_NAME,
+                XGroupCreateArgs.Builder.mkstream()) // create Redis stream if 
not exist
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
             .block();
 
-        this.consumer = Optional.of(consumeWorkQueue());
+        this.consumer = Optional.of(consumeRedisStream());
     }
 
-    private Disposable consumeWorkQueue() {
-        return Flux.using(
-                receiverProvider::createReceiver,
-            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
-            Receiver::close)
-            .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+    public Disposable consumeRedisStream() {
+        return Flux.interval(Duration.ofSeconds(1)) // pooling for new 
messages every 1 second. not sure it would be a good idea. Anyway just start 
simple...
+            .flatMap(ignore -> redisStreamReactiveCommands.xreadgroup(
+                Consumer.from(CONSUMER_GROUP_NAME, consumerName),
+                XReadArgs.Builder.block(0), // Waiting unlimited time for new 
messages to avoid too much pooling

Review Comment:
   maybe limiting the count of messages would prevent a consumer from being 
overwhelmed upon too many messages published in a short time, and distribute 
the load better between consumers.
   
   But I am not sure if the command would be blocked util received fully e.g. 
COUNT 10 messages yet. (need to test)
   If is not waiting for the full 10 messages, then it is good to use COUNT.
   
   Or change BLOCK to a non-forever duration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to