This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 57afe79251 [FIX] Correct websocket metrics
57afe79251 is described below

commit 57afe7925131b84d7e9327f8c641121d98a34689
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Dec 9 18:19:37 2024 +0100

    [FIX] Correct websocket metrics
    
    Count of connection was incremented and decremented as part
    of the initial HTTP exchange.
    
    This is wrong, it needs to be managed in the socket phase.
---
 .../apache/james/jmap/routes/WebSocketRoutes.scala    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 6cd72f359c..1406fe87a3 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -103,15 +103,14 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
       .flatMap((mailboxSession: MailboxSession) => 
userProvisioner.provisionUser(mailboxSession)
         .`then`
         
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL,
 "jmap")
-          .sendWebsocket((in, out) => 
handleWebSocketConnection(mailboxSession)(in, out)))
-          .doOnSubscribe(_ => openingConnectionsMetric.increment())
-          .doOnTerminate(() => openingConnectionsMetric.decrement())))
+          .sendWebsocket((in, out) => 
handleWebSocketConnection(mailboxSession)(in, out)))))
       .onErrorResume(throwable => handleHttpHandshakeError(throwable, 
httpServerResponse))
       .asJava()
       .`then`()
 
   private def handleWebSocketConnection(session: MailboxSession)(in: 
WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
     val sink: Sinks.Many[OutboundMessage] = 
Sinks.many().unicast().onBackpressureBuffer()
+    openingConnectionsMetric.increment()
 
     val context = ClientContext(sink, new AtomicReference[Registration](), 
session)
     val responseFlux: SFlux[OutboundMessage] = 
SFlux[WebSocketFrame](in.aggregateFrames()
@@ -124,10 +123,16 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
       .doOnNext(_ => connectedUsers.put(context, context))
       .doOnNext(_ => requestCountMetric.increment())
       .flatMap(message => handleClientMessages(context)(message))
-      .doOnTerminate(context.clean)
-      .doOnCancel(context.clean)
-      .doOnTerminate(() => connectedUsers.remove(context))
-      .doOnCancel(() => connectedUsers.remove(context))
+      .doOnTerminate(() => {
+        context.clean()
+        connectedUsers.remove(context)
+        openingConnectionsMetric.decrement()
+      })
+      .doOnCancel(() => {
+        context.clean()
+        connectedUsers.remove(context)
+        openingConnectionsMetric.decrement()
+      })
 
     out.sendString(
       SFlux.merge(Seq(responseFlux, sink.asFlux()))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to