This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 48d1361172 JAMES-3491 - JMAP WebSockets - support ping interval (#2561) 48d1361172 is described below commit 48d1361172e9fe58bb2bba281a3f018e205554c4 Author: vttran <vtt...@linagora.com> AuthorDate: Wed Dec 18 09:14:50 2024 +0700 JAMES-3491 - JMAP WebSockets - support ping interval (#2561) --- docs/modules/servers/partials/configure/jmap.adoc | 4 + .../apache/james/modules/TestJMAPServerModule.java | 40 +++- .../WebSocketWithPingIntervalContract.scala | 227 +++++++++++++++++++++ .../MemoryWebSocketWithPingIntervalTest.java | 65 ++++++ .../james/jmap/core/JmapRfc8621Configuration.scala | 14 +- .../apache/james/jmap/routes/WebSocketRoutes.scala | 34 +-- src/site/xdoc/server/config-jmap.xml | 5 + 7 files changed, 374 insertions(+), 15 deletions(-) diff --git a/docs/modules/servers/partials/configure/jmap.adoc b/docs/modules/servers/partials/configure/jmap.adoc index c5f697c53f..b2f0419844 100644 --- a/docs/modules/servers/partials/configure/jmap.adoc +++ b/docs/modules/servers/partials/configure/jmap.adoc @@ -33,6 +33,10 @@ Defaults to an empty list. | websocket.url.prefix | Optional. URL for JMAP WebSocket route. Default value: ws://localhost +| websocket.ping.interval +| Optional. Configure the duration of the interval between consecutive ping messages (as specified in RFC6455) sent by the server to the client over a WebSocket connection. +The supported unit is seconds (e.g: `3s` for a 3-second interval). Default is empty, this feature is disabled. + | email.send.max.size | Optional. Configuration max size for message created in RFC-8621. Default value: None. Supported units are B (bytes) K (KB) M (MB) G (GB). diff --git a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java index 7b70263afc..8e27983cbb 100644 --- a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java +++ b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java @@ -19,23 +19,51 @@ package org.apache.james.modules; +import static org.apache.james.jmap.core.JmapRfc8621Configuration.LOCALHOST_CONFIGURATION; + import java.io.FileNotFoundException; +import java.util.Map; import java.util.Optional; import jakarta.inject.Named; import jakarta.inject.Singleton; +import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.jmap.JMAPConfiguration; +import org.apache.james.jmap.core.JmapRfc8621Configuration; import org.apache.james.jwt.JwtConfiguration; import org.apache.james.jwt.JwtTokenVerifier; import org.apache.james.modules.mailbox.FastRetryBackoffModule; +import org.apache.james.utils.PropertiesProvider; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.AbstractModule; import com.google.inject.Provides; public class TestJMAPServerModule extends AbstractModule { + static class JmapRfc8621ConfigurationOverrideModule extends AbstractModule { + private final Map<String, Object> overrideJmapProperties; + + JmapRfc8621ConfigurationOverrideModule(Map<String, Object> overrideJmapProperties) { + this.overrideJmapProperties = overrideJmapProperties; + } + + @Provides + @Singleton + JmapRfc8621Configuration provideConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException { + try { + Configuration configuration = propertiesProvider.getConfiguration("jmap"); + overrideJmapProperties.forEach(configuration::setProperty); + return JmapRfc8621Configuration.from(configuration); + } catch (FileNotFoundException e) { + return LOCALHOST_CONFIGURATION(); + } + } + } + + private final Map<String, Object> overrideJmapProperties; private static final String PUBLIC_PEM_KEY = "-----BEGIN PUBLIC KEY-----\n" + @@ -80,15 +108,25 @@ public class TestJMAPServerModule extends AbstractModule { "ICQil1aaN7/2au+p7E4n7nzfYG7nRX5syDoqgBbdhpJxV8/5ohA=\n" + "-----END RSA PRIVATE KEY-----\n"; + public TestJMAPServerModule(Map<String, Object> overrideJmapProperties) { + this.overrideJmapProperties = overrideJmapProperties; + } + + public TestJMAPServerModule() { + this(ImmutableMap.of()); + } @Override protected void configure() { install(new FastRetryBackoffModule()); + if (!overrideJmapProperties.isEmpty()) { + install(new JmapRfc8621ConfigurationOverrideModule(overrideJmapProperties)); + } } @Provides @Singleton - JMAPConfiguration provideConfiguration() throws FileNotFoundException, ConfigurationException { + JMAPConfiguration provideConfiguration() { return JMAPConfiguration.builder() .enable() .randomPort() diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala new file mode 100644 index 0000000000..7e199ca6fa --- /dev/null +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala @@ -0,0 +1,227 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.rfc8621.contract + +import java.net.URI +import java.util.UUID + +import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson +import okhttp3.OkHttpClient +import org.apache.james.GuiceJamesServer +import org.apache.james.jmap.JmapGuiceProbe +import org.apache.james.jmap.api.change.State +import org.apache.james.jmap.api.model.AccountId +import org.apache.james.jmap.core.{PushState, UuidState} +import org.apache.james.jmap.rfc8621.contract.Fixture._ +import org.apache.james.mailbox.model.MailboxPath +import org.apache.james.modules.MailboxProbeImpl +import org.apache.james.utils.DataProbeImpl +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{AfterEach, Test, Timeout} +import sttp.capabilities.WebSockets +import sttp.client3.monad.IdMonad +import sttp.client3.okhttp.OkHttpSyncBackend +import sttp.client3.{Identity, RequestT, SttpBackend, asWebSocket, basicRequest} +import sttp.model.Uri +import sttp.monad.MonadError +import sttp.ws.WebSocketFrame.Text +import sttp.ws.{WebSocket, WebSocketFrame} + +import scala.jdk.CollectionConverters._ + +trait WebSocketWithPingIntervalContract { + private lazy val backend: SttpBackend[Identity, WebSockets] = OkHttpSyncBackend() + private lazy implicit val monadError: MonadError[Identity] = IdMonad + + def startJmapServer(overrideJmapProperties: Map[String, Object]): GuiceJamesServer + + def stopJmapServer(): Unit + + @AfterEach + def afterEach(): Unit = { + stopJmapServer() + } + + private def setUpJmapServer(overrideJmapProperties: Map[String, Object] = Map.empty): GuiceJamesServer = { + val server = startJmapServer(overrideJmapProperties) + server.getProbe(classOf[DataProbeImpl]) + .fluent() + .addDomain(DOMAIN.asString()) + .addUser(ANDRE.asString(), ANDRE_PASSWORD) + .addUser(BOB.asString(), BOB_PASSWORD) + .addUser(DAVID.asString(), "secret") + server + } + + @Test + @Timeout(180) + def apiRequestsShouldBeProcessedWhenClientPingInterval(): Unit = { + val server = setUpJmapServer() + // Given client sends PING frame interval 2s + val intervalDurationInMillis = 2000 + val backend: SttpBackend[Identity, WebSockets] = OkHttpSyncBackend.usingClient(new OkHttpClient.Builder() + .pingInterval(intervalDurationInMillis, java.util.concurrent.TimeUnit.MILLISECONDS) + .build()) + + // The websocket connection is keep alive during client ping interval + val response: Either[String, WebSocketFrame] = authenticatedRequest(server) + .response(asWebSocket[Identity, WebSocketFrame] { ws: WebSocket[Identity] => + sendEchoTextFrame(ws) + Thread.sleep(intervalDurationInMillis * 3) + ws.receive() + }) + .send(backend) + .body + + val responseAsFrame = response.toOption.get + assertThat(responseAsFrame).isInstanceOf(classOf[WebSocketFrame.Text]) + assertThatJson(responseAsFrame.asPayload) + .isEqualTo( + """{ + | "@type":"Response", + | "requestId":"req-36", + | "sessionState":"2c9f1b12-b35a-43e6-9af2-0106fb53a943", + | "methodResponses":[ + | ["Core/echo", + | { + | "arg1":"arg1data", + | "arg2":"arg2data" + | },"c1"] + | ] + |} + |""".stripMargin) + } + + @Test + @Timeout(180) + def apiRequestsShouldBeProcessedWhenConfigurePingIntervalResponse(): Unit = { + // Given a server with configured ping interval of 2s + val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s")) + + val requestId1 = UUID.randomUUID().toString + val requestId2 = UUID.randomUUID().toString + val response: Either[String, List[WebSocketFrame]] = + authenticatedRequest(server) + .response(asWebSocket[Identity, List[WebSocketFrame]] { + ws => + sendEchoTextFrame(ws, requestId1) + Thread.sleep(2000) + val frame1 = ws.receive() + sendEchoTextFrame(ws, requestId2) + Thread.sleep(2000) + val frame2 = ws.receive() + List(frame1, frame2) + }) + .send(backend) + .body + + val listResponseFrame = response.toOption.get.map(_.asInstanceOf[Text]).map(_.payload) + assertThat(listResponseFrame.asJava).hasSize(2) + assertThat(listResponseFrame.filter(frame => frame.contains(requestId1)).asJava).hasSize(1) + assertThat(listResponseFrame.filter(frame => frame.contains(requestId2)).asJava).hasSize(1) + } + + @Test + @Timeout(180) + def pushEnableRequestsShouldBeProcessedWhenConfigurePingIntervalResponse(): Unit = { + val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s")) + + val bobPath = MailboxPath.inbox(BOB) + val accountId: AccountId = AccountId.fromUsername(BOB) + val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) + + val response: Either[String, List[String]] = + authenticatedRequest(server) + .response(asWebSocket[Identity, List[String]] { + ws => + ws.send(WebSocketFrame.text( + """{ + | "@type": "WebSocketPushEnable", + | "dataTypes": ["Mailbox", "Email"] + |}""".stripMargin)) + + Thread.sleep(100) + + ws.send(WebSocketFrame.text( + s"""{ + | "@type": "Request", + | "id": "req-36", + | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], + | "methodCalls": [ + | ["Email/set", { + | "accountId": "$ACCOUNT_ID", + | "create": { + | "aaaaaa":{ + | "mailboxIds": { + | "${mailboxId.serialize}": true + | } + | } + | } + | }, "c1"]] + |}""".stripMargin)) + + List(ws.receive().asPayload, + ws.receive().asPayload) + }) + .send(backend) + .body + + Thread.sleep(100) + + val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe]) + val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId) + val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId) + + val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value + val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin + + assertThat(response.toOption.get.asJava) + .hasSize(2) + .contains(stateChange) + } + + private def authenticatedRequest(server: GuiceJamesServer): RequestT[Identity, Either[String, String], Any] = { + val port = server.getProbe(classOf[JmapGuiceProbe]) + .getJmapPort + .getValue + basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws"))) + .header("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=") + .header("Accept", ACCEPT_RFC8621_VERSION_HEADER) + } + + private def sendEchoTextFrame(ws: WebSocket[Identity], requestId: String = "req-36"): Identity[Unit] = { + ws.send(WebSocketFrame.text( + s"""{ + | "@type": "Request", + | "id": "$requestId", + | "using": [ "urn:ietf:params:jmap:core"], + | "methodCalls": [ + | [ + | "Core/echo", + | { + | "arg1": "arg1data", + | "arg2": "arg2data" + | }, + | "c1" + | ] + | ] + |}""".stripMargin)) + } +} diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java new file mode 100644 index 0000000000..df7fa61580 --- /dev/null +++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java @@ -0,0 +1,65 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.rfc8621.memory; + +import static org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT; + +import java.io.File; + +import org.apache.james.GuiceJamesServer; +import org.apache.james.MemoryJamesConfiguration; +import org.apache.james.MemoryJamesServerMain; +import org.apache.james.jmap.rfc8621.contract.IdentityProbeModule; +import org.apache.james.jmap.rfc8621.contract.WebSocketWithPingIntervalContract; +import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule; +import org.apache.james.modules.TestJMAPServerModule; +import org.junit.jupiter.api.io.TempDir; + +import com.github.fge.lambdas.Throwing; + +import scala.collection.immutable.Map; +import scala.jdk.javaapi.CollectionConverters; + +public class MemoryWebSocketWithPingIntervalTest implements WebSocketWithPingIntervalContract { + @TempDir + private File tmpDir; + + private GuiceJamesServer guiceJamesServer; + + @Override + public GuiceJamesServer startJmapServer(Map<String, Object> overrideJmapProperties) { + guiceJamesServer = MemoryJamesServerMain.createServer(MemoryJamesConfiguration.builder() + .workingDirectory(tmpDir) + .configurationFromClasspath() + .usersRepository(DEFAULT) + .enableJMAP() + .build()) + .overrideWith(new TestJMAPServerModule(CollectionConverters.asJava(overrideJmapProperties)), new DelegationProbeModule(), new IdentityProbeModule()); + Throwing.runnable(() -> guiceJamesServer.start()).run(); + return guiceJamesServer; + } + + @Override + public void stopJmapServer() { + if (guiceJamesServer != null && guiceJamesServer.isStarted()) { + guiceJamesServer.stop(); + } + } +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala index 0c66fb16e7..c9f5098aa7 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala @@ -20,16 +20,20 @@ package org.apache.james.jmap.core import java.net.URI +import java.time.temporal.ChronoUnit import java.util.Optional +import com.google.common.base.Preconditions import com.google.common.collect.ImmutableList import org.apache.commons.configuration2.Configuration import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier import org.apache.james.jmap.core.JmapRfc8621Configuration.{JMAP_EMAIL_GET_FULL_MAX_SIZE_DEFAULT, JMAP_MAX_OBJECT_IN_GET, JMAP_MAX_OBJECT_IN_SET, JMAP_UPLOAD_QUOTA_LIMIT_DEFAULT, MAX_SIZE_ATTACHMENTS_PER_MAIL_DEFAULT, UPLOAD_LIMIT_DEFAULT} import org.apache.james.jmap.pushsubscription.PushClientConfiguration -import org.apache.james.util.Size +import org.apache.james.util.{DurationParser, Size} +import scala.concurrent.duration.Duration import scala.jdk.CollectionConverters._ +import scala.jdk.DurationConverters._ import scala.jdk.OptionConverters._ object JmapConfigProperties { @@ -37,6 +41,7 @@ object JmapConfigProperties { val MAX_SIZE_ATTACHMENTS_PER_MAIL_PROPERTY: String = "max.size.attachments.per.mail" val URL_PREFIX_PROPERTY: String = "url.prefix" val WEBSOCKET_URL_PREFIX_PROPERTY: String = "websocket.url.prefix" + val WEBSOCKET_PING_INTERVAL_PROPERTY: String = "websocket.ping.interval" val WEB_PUSH_MAX_TIMEOUT_SECONDS_PROPERTY: String = "webpush.maxTimeoutSeconds" val WEB_PUSH_MAX_CONNECTIONS_PROPERTY: String = "webpush.maxConnections" val WEB_PUSH_PREVENT_SERVER_SIDE_REQUEST_FORGERY: String = "webpush.prevent.server.side.request.forgery" @@ -70,6 +75,12 @@ object JmapRfc8621Configuration { JmapRfc8621Configuration( urlPrefixString = Option(configuration.getString(URL_PREFIX_PROPERTY)).getOrElse(URL_PREFIX_DEFAULT), websocketPrefixString = Option(configuration.getString(WEBSOCKET_URL_PREFIX_PROPERTY)).getOrElse(WEBSOCKET_URL_PREFIX_DEFAULT), + websocketPingInterval = Option(configuration.getString(WEBSOCKET_PING_INTERVAL_PROPERTY)) + .map(DurationParser.parse(_, ChronoUnit.SECONDS)) + .map(duration => { + Preconditions.checkArgument(!duration.isZero && !duration.isNegative, s"`$WEBSOCKET_PING_INTERVAL_PROPERTY` must be positive".asInstanceOf[Object]) + duration.toScala + }), dynamicJmapPrefixResolutionEnabled = configuration.getBoolean(DYNAMIC_JMAP_PREFIX_RESOLUTION_ENABLED_PROPERTY, false), supportsDelaySends = configuration.getBoolean(DELAY_SENDS_ENABLED, false), maxUploadSize = Option(configuration.getString(UPLOAD_LIMIT_PROPERTY, null)) @@ -107,6 +118,7 @@ object JmapRfc8621Configuration { case class JmapRfc8621Configuration(urlPrefixString: String, websocketPrefixString: String, + websocketPingInterval: Option[Duration] = None, dynamicJmapPrefixResolutionEnabled: Boolean = false, supportsDelaySends: Boolean = false, maxUploadSize: MaxSizeUpload = UPLOAD_LIMIT_DEFAULT, diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala index 1406fe87a3..fa8f92f09f 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -26,7 +26,7 @@ import java.util.{Optional, stream} import com.google.common.collect.ImmutableMap import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE -import io.netty.handler.codec.http.websocketx.WebSocketFrame +import io.netty.handler.codec.http.websocketx.{PingWebSocketFrame, TextWebSocketFrame, WebSocketFrame} import io.netty.handler.codec.http.{HttpHeaderNames, HttpMethod} import jakarta.inject.{Inject, Named} import org.apache.james.core.{ConnectionDescription, ConnectionDescriptionSupplier, Disconnector, Username} @@ -51,9 +51,10 @@ import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST import reactor.core.publisher.{Mono, Sinks} import reactor.core.scala.publisher.{SFlux, SMono} import reactor.core.scheduler.Schedulers -import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} +import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse, WebsocketServerSpec} import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound} +import scala.concurrent.duration.Duration import scala.jdk.CollectionConverters._ object WebSocketRoutes { @@ -75,6 +76,7 @@ case class ClientContext(outbound: Sinks.Many[OutboundMessage], pushRegistration } class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, + val configuration: JmapRfc8621Configuration, userProvisioner: UserProvisioning, @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus, jmapApi: JMAPApi, @@ -87,6 +89,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato private val openingConnectionsMetric: Metric = metricFactory.generate("jmap_websocket_opening_connections_count") private val requestCountMetric: Metric = metricFactory.generate("jmap_websocket_requests_count") private val connectedUsers: java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] + private val websocketServerSpec: WebsocketServerSpec = WebsocketServerSpec.builder.handlePing(false).build override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( JMAPRoute.builder @@ -103,7 +106,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession) .`then` .`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL, "jmap") - .sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))))) + .sendWebsocket((in: WebsocketInbound, out: WebsocketOutbound) => handleWebSocketConnection(mailboxSession)(in, out), websocketServerSpec)))) .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse)) .asJava() .`then`() @@ -115,11 +118,8 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato val context = ClientContext(sink, new AtomicReference[Registration](), session) val responseFlux: SFlux[OutboundMessage] = SFlux[WebSocketFrame](in.aggregateFrames() .receiveFrames()) - .map(frame => { - val bytes = new Array[Byte](frame.content().readableBytes) - frame.content().readBytes(bytes) - new String(bytes, StandardCharsets.UTF_8) - }) + .filter(frame => frame.isInstanceOf[TextWebSocketFrame]) + .map(frame => frame.asInstanceOf[TextWebSocketFrame].text()) .doOnNext(_ => connectedUsers.put(context, context)) .doOnNext(_ => requestCountMetric.increment()) .flatMap(message => handleClientMessages(context)(message)) @@ -134,13 +134,21 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato openingConnectionsMetric.decrement() }) - out.sendString( - SFlux.merge(Seq(responseFlux, sink.asFlux())) - .map(pushSerializer.serialize) - .map(Json.stringify)) - .`then`() + val responseAndSinkFlux: SFlux[WebSocketFrame] = SFlux.merge(Seq(responseFlux, sink.asFlux())) + .map(pushSerializer.serialize) + .map(json => new TextWebSocketFrame(Json.stringify(json))) + + val resultFlux: SFlux[WebSocketFrame] = configuration.websocketPingInterval + .map(interval => responseAndSinkFlux.mergeWith(pingMessagePublisher(interval))) + .getOrElse(responseAndSinkFlux) + + out.sendObject(resultFlux).`then`() } + private def pingMessagePublisher(duration: Duration): SFlux[WebSocketFrame] = + SFlux.interval(duration) + .map(_ => new PingWebSocketFrame()) + private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[OutboundMessage] = pushSerializer.deserializeWebSocketInboundMessage(message) .fold(invalid => { diff --git a/src/site/xdoc/server/config-jmap.xml b/src/site/xdoc/server/config-jmap.xml index 23cea6c482..d92f30ac9d 100644 --- a/src/site/xdoc/server/config-jmap.xml +++ b/src/site/xdoc/server/config-jmap.xml @@ -66,6 +66,11 @@ <dd>Optional. URL for JMAP WebSocket route</dd> <dd>Default value: ws://localhost</dd> + <dt><strong>websocket.ping.interval</strong></dt> + <dd>Optional. Configure the duration of the interval between consecutive ping messages (as specified in RFC6455) sent by the server to the client over a WebSocket connection. + The supported unit is seconds (e.g: `3s` for a 3-second interval)</dd> + <dd>Default is empty, this feature is disabled.</dd> + <dt><strong>upload.max.size</strong></dt> <dd>Optional. Configuration max size for each upload file in new JMAP-RFC-8621.</dd> <dd>Default value: 30M. Supported units are B (bytes) K (KB) M (MB) G (GB).</dd> --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org