This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4f75a6fc574eb38f27a545561c82a04adaba51e8
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Sun Nov 17 10:14:28 2024 +0100

    JAMES-4090 Disconectors for JMAP websocket and event source
---
 .../james/jmap/rfc8621/RFC8621MethodsModule.java   |  5 +++++
 .../james/jmap/routes/EventSourceRoutes.scala      | 20 +++++++++++++++++--
 .../apache/james/jmap/routes/WebSocketRoutes.scala | 23 +++++++++++++++++++---
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
 
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index cbea6c24b6..05e789d9df 100644
--- 
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ 
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.james.core.Disconnector;
 import org.apache.james.jmap.JMAPRoutes;
 import org.apache.james.jmap.JMAPRoutesHandler;
 import org.apache.james.jmap.Version;
@@ -199,6 +200,10 @@ public class RFC8621MethodsModule extends AbstractModule {
         blobResolverMultibinder.addBinding().to(MessageBlobResolver.class);
         blobResolverMultibinder.addBinding().to(UploadResolver.class);
         blobResolverMultibinder.addBinding().to(MessagePartBlobResolver.class);
+
+        Multibinder<Disconnector> disconnectorMultibinder = 
Multibinder.newSetBinder(binder(), Disconnector.class);
+        disconnectorMultibinder.addBinding().to(WebSocketRoutes.class);
+        disconnectorMultibinder.addBinding().to(EventSourceRoutes.class);
     }
 
     @ProvidesIntoSet
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
index a51f1de8a5..35f0105504 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
@@ -22,7 +22,6 @@ package org.apache.james.jmap.routes
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicReference
 import java.util.stream
-
 import cats.implicits._
 import eu.timepit.refined.api.Refined
 import eu.timepit.refined.numeric.Positive
@@ -30,6 +29,7 @@ import eu.timepit.refined.refineV
 import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
 import io.netty.handler.codec.http.{HttpMethod, QueryStringDecoder}
 import jakarta.inject.{Inject, Named}
+import org.apache.james.core.{Disconnector, Username}
 import org.apache.james.events.{EventBus, Registration, RegistrationKey}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE
@@ -159,7 +159,8 @@ class EventSourceRoutes@Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticat
                                   @Named(JMAPInjectionKeys.JMAP) eventBus: 
EventBus,
                                   pushSerializer: PushSerializer,
                                   typeStateFactory: TypeStateFactory,
-                                  delegationStore: DelegationStore) extends 
JMAPRoutes {
+                                  delegationStore: DelegationStore) extends 
JMAPRoutes with Disconnector {
+  private val connectedUsers: 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext]
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
     JMAPRoute.builder
@@ -185,6 +186,7 @@ class EventSourceRoutes@Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticat
   private def registerSSE(response: HttpServerResponse, session: 
MailboxSession, options: EventSourceOptions): SMono[Unit] = {
     val sink: Sinks.Many[OutboundMessage] = 
Sinks.many().unicast().onBackpressureBuffer()
     val context = ClientContext(sink, new AtomicReference[Registration](), 
session)
+    connectedUsers.put(context, context)
 
     val pingDisposable = options.pingPolicy
       .asFlux()
@@ -207,6 +209,7 @@ class EventSourceRoutes@Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticat
           .map(asSSEEvent),
         StandardCharsets.UTF_8).`then`
       .doFinally(_ => context.clean())
+      .doFinally(_ => connectedUsers.remove(context))
       .doFinally(_ => pingDisposable.dispose())
       .`then`())
       .`then`()
@@ -232,4 +235,17 @@ class EventSourceRoutes@Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticat
       .sendString(SMono.fromCallable(() => 
ResponseSerializer.serialize(details).toString),
         StandardCharsets.UTF_8)
       .`then`)
+
+  override def disconnect(username: Username): Unit = {
+    val contexts = connectedUsers.values()
+      .stream()
+      .filter(context => username.equals(context.session.getUser))
+      .toList
+
+    contexts
+      .forEach(context => {
+        context.clean()
+        connectedUsers.remove(context)
+      })
+  }
 }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index a861a99be1..bd35b6f7c3 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -22,12 +22,11 @@ package org.apache.james.jmap.routes
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicReference
 import java.util.stream
-
 import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
 import io.netty.handler.codec.http.websocketx.WebSocketFrame
 import io.netty.handler.codec.http.{HttpHeaderNames, HttpMethod}
 import jakarta.inject.{Inject, Named}
-import org.apache.james.core.Username
+import org.apache.james.core.{Disconnector, Username}
 import org.apache.james.events.{EventBus, Registration, RegistrationKey}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
@@ -81,9 +80,10 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
                                  pushSerializer: PushSerializer,
                                  typeStateFactory: TypeStateFactory,
                                  delegationStore: DelegationStore,
-                                 metricFactory: MetricFactory) extends 
JMAPRoutes {
+                                 metricFactory: MetricFactory) extends 
JMAPRoutes with Disconnector {
   private val openingConnectionsMetric: Metric = 
metricFactory.generate("jmap_websocket_opening_connections_count")
   private val requestCountMetric: Metric = 
metricFactory.generate("jmap_websocket_requests_count")
+  private val connectedUsers: 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext]
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
     JMAPRoute.builder
@@ -118,10 +118,13 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
         frame.content().readBytes(bytes)
         new String(bytes, StandardCharsets.UTF_8)
       })
+      .doOnNext(_ => connectedUsers.put(context, context))
       .doOnNext(_ => requestCountMetric.increment())
       .flatMap(message => handleClientMessages(context)(message))
       .doOnTerminate(context.clean)
       .doOnCancel(context.clean)
+      .doOnTerminate(() => connectedUsers.remove(context))
+      .doOnCancel(() => connectedUsers.remove(context))
 
     out.sendString(
       SFlux.merge(Seq(responseFlux, sink.asFlux()))
@@ -151,6 +154,7 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
               .doOnNext(newRegistration => 
clientContext.withRegistration(newRegistration))
               .`then`(sendPushStateIfRequested(pushEnable, clientContext))
           case WebSocketPushDisable => SMono.fromCallable(() => 
clientContext.clean())
+            .`then`(SMono.fromCallable(() => 
connectedUsers.remove(clientContext)))
           .`then`(SMono.empty)
       })
 
@@ -190,4 +194,17 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
       .sendString(SMono.fromCallable(() => 
ResponseSerializer.serialize(details).toString),
         StandardCharsets.UTF_8)
       .`then`)
+
+  override def disconnect(username: Username): Unit = {
+    val contexts = connectedUsers.values()
+      .stream()
+      .filter(context => username.equals(context.session.getUser))
+      .toList
+
+    contexts
+      .forEach(context => {
+        context.clean()
+        connectedUsers.remove(context)
+      })
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to