This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 57afe79251 [FIX] Correct websocket metrics 57afe79251 is described below commit 57afe7925131b84d7e9327f8c641121d98a34689 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Mon Dec 9 18:19:37 2024 +0100 [FIX] Correct websocket metrics Count of connection was incremented and decremented as part of the initial HTTP exchange. This is wrong, it needs to be managed in the socket phase. --- .../apache/james/jmap/routes/WebSocketRoutes.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala index 6cd72f359c..1406fe87a3 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -103,15 +103,14 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession) .`then` .`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "jmap") - .sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))) - .doOnSubscribe(_ => openingConnectionsMetric.increment()) - .doOnTerminate(() => openingConnectionsMetric.decrement()))) + .sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))))) .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse)) .asJava() .`then`() private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = { val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer() + openingConnectionsMetric.increment() val context = ClientContext(sink, new AtomicReference[Registration](), session) val responseFlux: SFlux[OutboundMessage] = SFlux[WebSocketFrame](in.aggregateFrames() @@ -124,10 +123,16 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .doOnNext(_ => connectedUsers.put(context, context)) .doOnNext(_ => requestCountMetric.increment()) .flatMap(message => handleClientMessages(context)(message)) - .doOnTerminate(context.clean) - .doOnCancel(context.clean) - .doOnTerminate(() => connectedUsers.remove(context)) - .doOnCancel(() => connectedUsers.remove(context)) + .doOnTerminate(() => { + context.clean() + connectedUsers.remove(context) + openingConnectionsMetric.decrement() + }) + .doOnCancel(() => { + context.clean() + connectedUsers.remove(context) + openingConnectionsMetric.decrement() + }) out.sendString( SFlux.merge(Seq(responseFlux, sink.asFlux())) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org