This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4f75a6fc574eb38f27a545561c82a04adaba51e8 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Sun Nov 17 10:14:28 2024 +0100 JAMES-4090 Disconectors for JMAP websocket and event source --- .../james/jmap/rfc8621/RFC8621MethodsModule.java | 5 +++++ .../james/jmap/routes/EventSourceRoutes.scala | 20 +++++++++++++++++-- .../apache/james/jmap/routes/WebSocketRoutes.scala | 23 +++++++++++++++++++--- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java index cbea6c24b6..05e789d9df 100644 --- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java +++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.james.core.Disconnector; import org.apache.james.jmap.JMAPRoutes; import org.apache.james.jmap.JMAPRoutesHandler; import org.apache.james.jmap.Version; @@ -199,6 +200,10 @@ public class RFC8621MethodsModule extends AbstractModule { blobResolverMultibinder.addBinding().to(MessageBlobResolver.class); blobResolverMultibinder.addBinding().to(UploadResolver.class); blobResolverMultibinder.addBinding().to(MessagePartBlobResolver.class); + + Multibinder<Disconnector> disconnectorMultibinder = Multibinder.newSetBinder(binder(), Disconnector.class); + disconnectorMultibinder.addBinding().to(WebSocketRoutes.class); + disconnectorMultibinder.addBinding().to(EventSourceRoutes.class); } @ProvidesIntoSet diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala index a51f1de8a5..35f0105504 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala @@ -22,7 +22,6 @@ package org.apache.james.jmap.routes import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicReference import java.util.stream - import cats.implicits._ import eu.timepit.refined.api.Refined import eu.timepit.refined.numeric.Positive @@ -30,6 +29,7 @@ import eu.timepit.refined.refineV import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE import io.netty.handler.codec.http.{HttpMethod, QueryStringDecoder} import jakarta.inject.{Inject, Named} +import org.apache.james.core.{Disconnector, Username} import org.apache.james.events.{EventBus, Registration, RegistrationKey} import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE @@ -159,7 +159,8 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus, pushSerializer: PushSerializer, typeStateFactory: TypeStateFactory, - delegationStore: DelegationStore) extends JMAPRoutes { + delegationStore: DelegationStore) extends JMAPRoutes with Disconnector { + private val connectedUsers: java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( JMAPRoute.builder @@ -185,6 +186,7 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat private def registerSSE(response: HttpServerResponse, session: MailboxSession, options: EventSourceOptions): SMono[Unit] = { val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer() val context = ClientContext(sink, new AtomicReference[Registration](), session) + connectedUsers.put(context, context) val pingDisposable = options.pingPolicy .asFlux() @@ -207,6 +209,7 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat .map(asSSEEvent), StandardCharsets.UTF_8).`then` .doFinally(_ => context.clean()) + .doFinally(_ => connectedUsers.remove(context)) .doFinally(_ => pingDisposable.dispose()) .`then`()) .`then`() @@ -232,4 +235,17 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString), StandardCharsets.UTF_8) .`then`) + + override def disconnect(username: Username): Unit = { + val contexts = connectedUsers.values() + .stream() + .filter(context => username.equals(context.session.getUser)) + .toList + + contexts + .forEach(context => { + context.clean() + connectedUsers.remove(context) + }) + } } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala index a861a99be1..bd35b6f7c3 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -22,12 +22,11 @@ package org.apache.james.jmap.routes import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicReference import java.util.stream - import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE import io.netty.handler.codec.http.websocketx.WebSocketFrame import io.netty.handler.codec.http.{HttpHeaderNames, HttpMethod} import jakarta.inject.{Inject, Named} -import org.apache.james.core.Username +import org.apache.james.core.{Disconnector, Username} import org.apache.james.events.{EventBus, Registration, RegistrationKey} import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE import org.apache.james.jmap.JMAPUrls.JMAP_WS @@ -81,9 +80,10 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato pushSerializer: PushSerializer, typeStateFactory: TypeStateFactory, delegationStore: DelegationStore, - metricFactory: MetricFactory) extends JMAPRoutes { + metricFactory: MetricFactory) extends JMAPRoutes with Disconnector { private val openingConnectionsMetric: Metric = metricFactory.generate("jmap_websocket_opening_connections_count") private val requestCountMetric: Metric = metricFactory.generate("jmap_websocket_requests_count") + private val connectedUsers: java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( JMAPRoute.builder @@ -118,10 +118,13 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato frame.content().readBytes(bytes) new String(bytes, StandardCharsets.UTF_8) }) + .doOnNext(_ => connectedUsers.put(context, context)) .doOnNext(_ => requestCountMetric.increment()) .flatMap(message => handleClientMessages(context)(message)) .doOnTerminate(context.clean) .doOnCancel(context.clean) + .doOnTerminate(() => connectedUsers.remove(context)) + .doOnCancel(() => connectedUsers.remove(context)) out.sendString( SFlux.merge(Seq(responseFlux, sink.asFlux())) @@ -151,6 +154,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .doOnNext(newRegistration => clientContext.withRegistration(newRegistration)) .`then`(sendPushStateIfRequested(pushEnable, clientContext)) case WebSocketPushDisable => SMono.fromCallable(() => clientContext.clean()) + .`then`(SMono.fromCallable(() => connectedUsers.remove(clientContext))) .`then`(SMono.empty) }) @@ -190,4 +194,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString), StandardCharsets.UTF_8) .`then`) + + override def disconnect(username: Username): Unit = { + val contexts = connectedUsers.values() + .stream() + .filter(context => username.equals(context.session.getUser)) + .toList + + contexts + .forEach(context => { + context.clean() + connectedUsers.remove(context) + }) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org