chibenwa commented on code in PR #2028:
URL: https://github.com/apache/james-project/pull/2028#discussion_r1497401126


##########
event-bus/redis/src/main/java/org/apache/james/events/GroupRegistrationHandler.java:
##########
@@ -104,68 +105,62 @@ GroupRegistration retrieveGroupRegistration(Group group) {
     }
 
     public void start() {
-        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
-        channelPool.createWorkQueue(
-            QueueSpecification.queue(queueName.asString())
-                .durable(DURABLE)
-                .exclusive(!EXCLUSIVE)
-                .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
-                    .deadLetter(namingStrategy.deadLetterExchange())
-                    .build()),
-            BindingSpecification.binding()
-                .exchange(namingStrategy.exchange())
-                .queue(queueName.asString())
-                .routingKey(EMPTY_ROUTING_KEY))
+        scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, 
ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); // create 
thread pool for Event bus
+
+        redisStreamReactiveCommands
+            .xgroupCreate(XReadArgs.StreamOffset.from(streamName, "0-0"),
+                CONSUMER_GROUP_NAME,
+                XGroupCreateArgs.Builder.mkstream()) // create Redis stream if 
not exist
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
             .block();
 
-        this.consumer = Optional.of(consumeWorkQueue());
+        this.consumer = Optional.of(consumeRedisStream());
     }
 
-    private Disposable consumeWorkQueue() {
-        return Flux.using(
-                receiverProvider::createReceiver,
-            receiver -> receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
-            Receiver::close)
-            .filter(delivery -> Objects.nonNull(delivery.getBody()))
-            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+    public Disposable consumeRedisStream() {
+        return Flux.interval(Duration.ofSeconds(1)) // pooling for new 
messages every 1 second. not sure it would be a good idea. Anyway just start 
simple...

Review Comment:
   No "group" implem with REDIS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to