This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 48d1361172 JAMES-3491 - JMAP WebSockets - support ping interval (#2561)
48d1361172 is described below

commit 48d1361172e9fe58bb2bba281a3f018e205554c4
Author: vttran <vtt...@linagora.com>
AuthorDate: Wed Dec 18 09:14:50 2024 +0700

    JAMES-3491 - JMAP WebSockets - support ping interval (#2561)
---
 docs/modules/servers/partials/configure/jmap.adoc  |   4 +
 .../apache/james/modules/TestJMAPServerModule.java |  40 +++-
 .../WebSocketWithPingIntervalContract.scala        | 227 +++++++++++++++++++++
 .../MemoryWebSocketWithPingIntervalTest.java       |  65 ++++++
 .../james/jmap/core/JmapRfc8621Configuration.scala |  14 +-
 .../apache/james/jmap/routes/WebSocketRoutes.scala |  34 +--
 src/site/xdoc/server/config-jmap.xml               |   5 +
 7 files changed, 374 insertions(+), 15 deletions(-)

diff --git a/docs/modules/servers/partials/configure/jmap.adoc 
b/docs/modules/servers/partials/configure/jmap.adoc
index c5f697c53f..b2f0419844 100644
--- a/docs/modules/servers/partials/configure/jmap.adoc
+++ b/docs/modules/servers/partials/configure/jmap.adoc
@@ -33,6 +33,10 @@ Defaults to an empty list.
 | websocket.url.prefix
 | Optional. URL for JMAP WebSocket route. Default value: ws://localhost
 
+| websocket.ping.interval
+| Optional. Configure the duration of the interval between consecutive ping 
messages (as specified in RFC6455) sent by the server to the client over a 
WebSocket connection.
+The supported unit is seconds (e.g: `3s` for a 3-second interval). Default is 
empty, this feature is disabled.
+
 | email.send.max.size
 | Optional. Configuration max size for message created in RFC-8621.
 Default value: None. Supported units are B (bytes) K (KB) M (MB) G (GB).
diff --git 
a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
 
b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
index 7b70263afc..8e27983cbb 100644
--- 
a/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
+++ 
b/server/container/guice/protocols/jmap/src/test/java/org/apache/james/modules/TestJMAPServerModule.java
@@ -19,23 +19,51 @@
 
 package org.apache.james.modules;
 
+import static 
org.apache.james.jmap.core.JmapRfc8621Configuration.LOCALHOST_CONFIGURATION;
+
 import java.io.FileNotFoundException;
+import java.util.Map;
 import java.util.Optional;
 
 import jakarta.inject.Named;
 import jakarta.inject.Singleton;
 
+import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.jmap.JMAPConfiguration;
+import org.apache.james.jmap.core.JmapRfc8621Configuration;
 import org.apache.james.jwt.JwtConfiguration;
 import org.apache.james.jwt.JwtTokenVerifier;
 import org.apache.james.modules.mailbox.FastRetryBackoffModule;
+import org.apache.james.utils.PropertiesProvider;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 
 public class TestJMAPServerModule extends AbstractModule {
+    static class JmapRfc8621ConfigurationOverrideModule extends AbstractModule 
{
+        private final Map<String, Object> overrideJmapProperties;
+
+        JmapRfc8621ConfigurationOverrideModule(Map<String, Object> 
overrideJmapProperties) {
+            this.overrideJmapProperties = overrideJmapProperties;
+        }
+
+        @Provides
+        @Singleton
+        JmapRfc8621Configuration provideConfiguration(PropertiesProvider 
propertiesProvider) throws ConfigurationException {
+            try {
+                Configuration configuration = 
propertiesProvider.getConfiguration("jmap");
+                overrideJmapProperties.forEach(configuration::setProperty);
+                return JmapRfc8621Configuration.from(configuration);
+            } catch (FileNotFoundException e) {
+                return LOCALHOST_CONFIGURATION();
+            }
+        }
+    }
+
+    private final Map<String, Object> overrideJmapProperties;
 
     private static final String PUBLIC_PEM_KEY =
         "-----BEGIN PUBLIC KEY-----\n" +
@@ -80,15 +108,25 @@ public class TestJMAPServerModule extends AbstractModule {
             "ICQil1aaN7/2au+p7E4n7nzfYG7nRX5syDoqgBbdhpJxV8/5ohA=\n" +
             "-----END RSA PRIVATE KEY-----\n";
 
+    public TestJMAPServerModule(Map<String, Object> overrideJmapProperties) {
+        this.overrideJmapProperties = overrideJmapProperties;
+    }
+
+    public TestJMAPServerModule() {
+        this(ImmutableMap.of());
+    }
 
     @Override
     protected void configure() {
         install(new FastRetryBackoffModule());
+        if (!overrideJmapProperties.isEmpty()) {
+            install(new 
JmapRfc8621ConfigurationOverrideModule(overrideJmapProperties));
+        }
     }
 
     @Provides
     @Singleton
-    JMAPConfiguration provideConfiguration() throws FileNotFoundException, 
ConfigurationException {
+    JMAPConfiguration provideConfiguration() {
         return JMAPConfiguration.builder()
             .enable()
             .randomPort()
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
new file mode 100644
index 0000000000..7e199ca6fa
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketWithPingIntervalContract.scala
@@ -0,0 +1,227 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.contract
+
+import java.net.URI
+import java.util.UUID
+
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import okhttp3.OkHttpClient
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.JmapGuiceProbe
+import org.apache.james.jmap.api.change.State
+import org.apache.james.jmap.api.model.AccountId
+import org.apache.james.jmap.core.{PushState, UuidState}
+import org.apache.james.jmap.rfc8621.contract.Fixture._
+import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.modules.MailboxProbeImpl
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{AfterEach, Test, Timeout}
+import sttp.capabilities.WebSockets
+import sttp.client3.monad.IdMonad
+import sttp.client3.okhttp.OkHttpSyncBackend
+import sttp.client3.{Identity, RequestT, SttpBackend, asWebSocket, 
basicRequest}
+import sttp.model.Uri
+import sttp.monad.MonadError
+import sttp.ws.WebSocketFrame.Text
+import sttp.ws.{WebSocket, WebSocketFrame}
+
+import scala.jdk.CollectionConverters._
+
+trait WebSocketWithPingIntervalContract {
+  private lazy val backend: SttpBackend[Identity, WebSockets] = 
OkHttpSyncBackend()
+  private lazy implicit val monadError: MonadError[Identity] = IdMonad
+
+  def startJmapServer(overrideJmapProperties: Map[String, Object]): 
GuiceJamesServer
+
+  def stopJmapServer(): Unit
+
+  @AfterEach
+  def afterEach(): Unit = {
+    stopJmapServer()
+  }
+
+  private def setUpJmapServer(overrideJmapProperties: Map[String, Object] = 
Map.empty): GuiceJamesServer = {
+    val server = startJmapServer(overrideJmapProperties)
+    server.getProbe(classOf[DataProbeImpl])
+      .fluent()
+      .addDomain(DOMAIN.asString())
+      .addUser(ANDRE.asString(), ANDRE_PASSWORD)
+      .addUser(BOB.asString(), BOB_PASSWORD)
+      .addUser(DAVID.asString(), "secret")
+    server
+  }
+
+  @Test
+  @Timeout(180)
+  def apiRequestsShouldBeProcessedWhenClientPingInterval(): Unit = {
+    val server = setUpJmapServer()
+    // Given client sends PING frame interval 2s
+    val intervalDurationInMillis = 2000
+    val backend: SttpBackend[Identity, WebSockets] = 
OkHttpSyncBackend.usingClient(new OkHttpClient.Builder()
+      .pingInterval(intervalDurationInMillis, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+      .build())
+
+    // The websocket connection is keep alive during client ping interval
+    val response: Either[String, WebSocketFrame] = authenticatedRequest(server)
+      .response(asWebSocket[Identity, WebSocketFrame] { ws: 
WebSocket[Identity] =>
+        sendEchoTextFrame(ws)
+        Thread.sleep(intervalDurationInMillis * 3)
+        ws.receive()
+      })
+      .send(backend)
+      .body
+
+    val responseAsFrame = response.toOption.get
+    assertThat(responseAsFrame).isInstanceOf(classOf[WebSocketFrame.Text])
+    assertThatJson(responseAsFrame.asPayload)
+      .isEqualTo(
+        """{
+          |  "@type":"Response",
+          |  "requestId":"req-36",
+          |  "sessionState":"2c9f1b12-b35a-43e6-9af2-0106fb53a943",
+          |  "methodResponses":[
+          |    ["Core/echo",
+          |      {
+          |        "arg1":"arg1data",
+          |        "arg2":"arg2data"
+          |      },"c1"]
+          |  ]
+          |}
+          |""".stripMargin)
+  }
+
+  @Test
+  @Timeout(180)
+  def apiRequestsShouldBeProcessedWhenConfigurePingIntervalResponse(): Unit = {
+    // Given a server with configured ping interval of 2s
+    val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s"))
+
+    val requestId1 = UUID.randomUUID().toString
+    val requestId2 = UUID.randomUUID().toString
+    val response: Either[String, List[WebSocketFrame]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[WebSocketFrame]] {
+          ws =>
+            sendEchoTextFrame(ws, requestId1)
+            Thread.sleep(2000)
+            val frame1 = ws.receive()
+            sendEchoTextFrame(ws, requestId2)
+            Thread.sleep(2000)
+            val frame2 = ws.receive()
+            List(frame1, frame2)
+        })
+        .send(backend)
+        .body
+
+    val listResponseFrame = 
response.toOption.get.map(_.asInstanceOf[Text]).map(_.payload)
+    assertThat(listResponseFrame.asJava).hasSize(2)
+    assertThat(listResponseFrame.filter(frame => 
frame.contains(requestId1)).asJava).hasSize(1)
+    assertThat(listResponseFrame.filter(frame => 
frame.contains(requestId2)).asJava).hasSize(1)
+  }
+
+  @Test
+  @Timeout(180)
+  def pushEnableRequestsShouldBeProcessedWhenConfigurePingIntervalResponse(): 
Unit = {
+    val server = setUpJmapServer(Map("websocket.ping.interval" -> "2s"))
+
+    val bobPath = MailboxPath.inbox(BOB)
+    val accountId: AccountId = AccountId.fromUsername(BOB)
+    val mailboxId = 
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["Mailbox", "Email"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            ws.send(WebSocketFrame.text(
+              s"""{
+                 |  "@type": "Request",
+                 |  "id": "req-36",
+                 |  "using": ["urn:ietf:params:jmap:core", 
"urn:ietf:params:jmap:mail"],
+                 |  "methodCalls": [
+                 |    ["Email/set", {
+                 |      "accountId": "$ACCOUNT_ID",
+                 |      "create": {
+                 |        "aaaaaa":{
+                 |          "mailboxIds": {
+                 |             "${mailboxId.serialize}": true
+                 |          }
+                 |        }
+                 |      }
+                 |    }, "c1"]]
+                 |}""".stripMargin))
+
+            List(ws.receive().asPayload,
+              ws.receive().asPayload)
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(100)
+
+    val jmapGuiceProbe: JmapGuiceProbe = 
server.getProbe(classOf[JmapGuiceProbe])
+    val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
+    val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
+
+    val globalState: String = 
PushState.fromOption(Some(UuidState.fromJava(mailboxState)), 
Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = 
s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+
+    assertThat(response.toOption.get.asJava)
+      .hasSize(2)
+      .contains(stateChange)
+  }
+
+  private def authenticatedRequest(server: GuiceJamesServer): 
RequestT[Identity, Either[String, String], Any] = {
+    val port = server.getProbe(classOf[JmapGuiceProbe])
+      .getJmapPort
+      .getValue
+    basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
+      .header("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+      .header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+  }
+
+  private def sendEchoTextFrame(ws: WebSocket[Identity], requestId: String = 
"req-36"): Identity[Unit] = {
+    ws.send(WebSocketFrame.text(
+      s"""{
+        |  "@type": "Request",
+        |  "id": "$requestId",
+        |  "using": [ "urn:ietf:params:jmap:core"],
+        |  "methodCalls": [
+        |    [
+        |      "Core/echo",
+        |      {
+        |        "arg1": "arg1data",
+        |        "arg2": "arg2data"
+        |      },
+        |      "c1"
+        |    ]
+        |  ]
+        |}""".stripMargin))
+  }
+}
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
 
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
new file mode 100644
index 0000000000..df7fa61580
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketWithPingIntervalTest.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.memory;
+
+import static 
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import java.io.File;
+
+import org.apache.james.GuiceJamesServer;
+import org.apache.james.MemoryJamesConfiguration;
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.jmap.rfc8621.contract.IdentityProbeModule;
+import 
org.apache.james.jmap.rfc8621.contract.WebSocketWithPingIntervalContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.junit.jupiter.api.io.TempDir;
+
+import com.github.fge.lambdas.Throwing;
+
+import scala.collection.immutable.Map;
+import scala.jdk.javaapi.CollectionConverters;
+
+public class MemoryWebSocketWithPingIntervalTest implements 
WebSocketWithPingIntervalContract {
+    @TempDir
+    private File tmpDir;
+
+    private GuiceJamesServer guiceJamesServer;
+
+    @Override
+    public GuiceJamesServer startJmapServer(Map<String, Object> 
overrideJmapProperties) {
+        guiceJamesServer = 
MemoryJamesServerMain.createServer(MemoryJamesConfiguration.builder()
+                .workingDirectory(tmpDir)
+                .configurationFromClasspath()
+                .usersRepository(DEFAULT)
+                .enableJMAP()
+                .build())
+            .overrideWith(new 
TestJMAPServerModule(CollectionConverters.asJava(overrideJmapProperties)), new 
DelegationProbeModule(), new IdentityProbeModule());
+        Throwing.runnable(() -> guiceJamesServer.start()).run();
+        return guiceJamesServer;
+    }
+
+    @Override
+    public void stopJmapServer() {
+        if (guiceJamesServer != null && guiceJamesServer.isStarted()) {
+            guiceJamesServer.stop();
+        }
+    }
+}
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
index 0c66fb16e7..c9f5098aa7 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
@@ -20,16 +20,20 @@
 package org.apache.james.jmap.core
 
 import java.net.URI
+import java.time.temporal.ChronoUnit
 import java.util.Optional
 
+import com.google.common.base.Preconditions
 import com.google.common.collect.ImmutableList
 import org.apache.commons.configuration2.Configuration
 import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
 import 
org.apache.james.jmap.core.JmapRfc8621Configuration.{JMAP_EMAIL_GET_FULL_MAX_SIZE_DEFAULT,
 JMAP_MAX_OBJECT_IN_GET, JMAP_MAX_OBJECT_IN_SET, 
JMAP_UPLOAD_QUOTA_LIMIT_DEFAULT, MAX_SIZE_ATTACHMENTS_PER_MAIL_DEFAULT, 
UPLOAD_LIMIT_DEFAULT}
 import org.apache.james.jmap.pushsubscription.PushClientConfiguration
-import org.apache.james.util.Size
+import org.apache.james.util.{DurationParser, Size}
 
+import scala.concurrent.duration.Duration
 import scala.jdk.CollectionConverters._
+import scala.jdk.DurationConverters._
 import scala.jdk.OptionConverters._
 
 object JmapConfigProperties {
@@ -37,6 +41,7 @@ object JmapConfigProperties {
   val MAX_SIZE_ATTACHMENTS_PER_MAIL_PROPERTY: String = 
"max.size.attachments.per.mail"
   val URL_PREFIX_PROPERTY: String = "url.prefix"
   val WEBSOCKET_URL_PREFIX_PROPERTY: String = "websocket.url.prefix"
+  val WEBSOCKET_PING_INTERVAL_PROPERTY: String = "websocket.ping.interval"
   val WEB_PUSH_MAX_TIMEOUT_SECONDS_PROPERTY: String = 
"webpush.maxTimeoutSeconds"
   val WEB_PUSH_MAX_CONNECTIONS_PROPERTY: String = "webpush.maxConnections"
   val WEB_PUSH_PREVENT_SERVER_SIDE_REQUEST_FORGERY: String = 
"webpush.prevent.server.side.request.forgery"
@@ -70,6 +75,12 @@ object JmapRfc8621Configuration {
     JmapRfc8621Configuration(
       urlPrefixString = 
Option(configuration.getString(URL_PREFIX_PROPERTY)).getOrElse(URL_PREFIX_DEFAULT),
       websocketPrefixString = 
Option(configuration.getString(WEBSOCKET_URL_PREFIX_PROPERTY)).getOrElse(WEBSOCKET_URL_PREFIX_DEFAULT),
+      websocketPingInterval = 
Option(configuration.getString(WEBSOCKET_PING_INTERVAL_PROPERTY))
+        .map(DurationParser.parse(_, ChronoUnit.SECONDS))
+        .map(duration => {
+          Preconditions.checkArgument(!duration.isZero && 
!duration.isNegative, s"`$WEBSOCKET_PING_INTERVAL_PROPERTY` must be 
positive".asInstanceOf[Object])
+          duration.toScala
+        }),
       dynamicJmapPrefixResolutionEnabled = 
configuration.getBoolean(DYNAMIC_JMAP_PREFIX_RESOLUTION_ENABLED_PROPERTY, 
false),
       supportsDelaySends = configuration.getBoolean(DELAY_SENDS_ENABLED, 
false),
       maxUploadSize = Option(configuration.getString(UPLOAD_LIMIT_PROPERTY, 
null))
@@ -107,6 +118,7 @@ object JmapRfc8621Configuration {
 
 case class JmapRfc8621Configuration(urlPrefixString: String,
                                     websocketPrefixString: String,
+                                    websocketPingInterval: Option[Duration] = 
None,
                                     dynamicJmapPrefixResolutionEnabled: 
Boolean = false,
                                     supportsDelaySends: Boolean = false,
                                     maxUploadSize: MaxSizeUpload = 
UPLOAD_LIMIT_DEFAULT,
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 1406fe87a3..fa8f92f09f 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -26,7 +26,7 @@ import java.util.{Optional, stream}
 
 import com.google.common.collect.ImmutableMap
 import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
-import io.netty.handler.codec.http.websocketx.WebSocketFrame
+import io.netty.handler.codec.http.websocketx.{PingWebSocketFrame, 
TextWebSocketFrame, WebSocketFrame}
 import io.netty.handler.codec.http.{HttpHeaderNames, HttpMethod}
 import jakarta.inject.{Inject, Named}
 import org.apache.james.core.{ConnectionDescription, 
ConnectionDescriptionSupplier, Disconnector, Username}
@@ -51,9 +51,10 @@ import 
reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.publisher.{Mono, Sinks}
 import reactor.core.scala.publisher.{SFlux, SMono}
 import reactor.core.scheduler.Schedulers
-import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse, 
WebsocketServerSpec}
 import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
 
+import scala.concurrent.duration.Duration
 import scala.jdk.CollectionConverters._
 
 object WebSocketRoutes {
@@ -75,6 +76,7 @@ case class ClientContext(outbound: 
Sinks.Many[OutboundMessage], pushRegistration
 }
 
 class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val 
authenticator: Authenticator,
+                                 val configuration: JmapRfc8621Configuration,
                                  userProvisioner: UserProvisioning,
                                  @Named(JMAPInjectionKeys.JMAP) eventBus: 
EventBus,
                                  jmapApi: JMAPApi,
@@ -87,6 +89,7 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
   private val openingConnectionsMetric: Metric = 
metricFactory.generate("jmap_websocket_opening_connections_count")
   private val requestCountMetric: Metric = 
metricFactory.generate("jmap_websocket_requests_count")
   private val connectedUsers: 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext] = new 
java.util.concurrent.ConcurrentHashMap[ClientContext, ClientContext]
+  private val websocketServerSpec: WebsocketServerSpec = 
WebsocketServerSpec.builder.handlePing(false).build
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
     JMAPRoute.builder
@@ -103,7 +106,7 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
       .flatMap((mailboxSession: MailboxSession) => 
userProvisioner.provisionUser(mailboxSession)
         .`then`
         
.`then`(SMono(httpServerResponse.addHeader(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL,
 "jmap")
-          .sendWebsocket((in, out) => 
handleWebSocketConnection(mailboxSession)(in, out)))))
+          .sendWebsocket((in: WebsocketInbound, out: WebsocketOutbound) => 
handleWebSocketConnection(mailboxSession)(in, out), websocketServerSpec))))
       .onErrorResume(throwable => handleHttpHandshakeError(throwable, 
httpServerResponse))
       .asJava()
       .`then`()
@@ -115,11 +118,8 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
     val context = ClientContext(sink, new AtomicReference[Registration](), 
session)
     val responseFlux: SFlux[OutboundMessage] = 
SFlux[WebSocketFrame](in.aggregateFrames()
       .receiveFrames())
-      .map(frame => {
-        val bytes = new Array[Byte](frame.content().readableBytes)
-        frame.content().readBytes(bytes)
-        new String(bytes, StandardCharsets.UTF_8)
-      })
+      .filter(frame => frame.isInstanceOf[TextWebSocketFrame])
+      .map(frame => frame.asInstanceOf[TextWebSocketFrame].text())
       .doOnNext(_ => connectedUsers.put(context, context))
       .doOnNext(_ => requestCountMetric.increment())
       .flatMap(message => handleClientMessages(context)(message))
@@ -134,13 +134,21 @@ class WebSocketRoutes @Inject() 
(@Named(InjectionKeys.RFC_8621) val authenticato
         openingConnectionsMetric.decrement()
       })
 
-    out.sendString(
-      SFlux.merge(Seq(responseFlux, sink.asFlux()))
-        .map(pushSerializer.serialize)
-        .map(Json.stringify))
-      .`then`()
+    val responseAndSinkFlux: SFlux[WebSocketFrame] = 
SFlux.merge(Seq(responseFlux, sink.asFlux()))
+      .map(pushSerializer.serialize)
+      .map(json => new TextWebSocketFrame(Json.stringify(json)))
+
+    val resultFlux: SFlux[WebSocketFrame] = configuration.websocketPingInterval
+      .map(interval => 
responseAndSinkFlux.mergeWith(pingMessagePublisher(interval)))
+      .getOrElse(responseAndSinkFlux)
+
+    out.sendObject(resultFlux).`then`()
   }
 
+  private def pingMessagePublisher(duration: Duration): SFlux[WebSocketFrame] =
+    SFlux.interval(duration)
+      .map(_ => new PingWebSocketFrame())
+
   private def handleClientMessages(clientContext: ClientContext)(message: 
String): SMono[OutboundMessage] =
     pushSerializer.deserializeWebSocketInboundMessage(message)
       .fold(invalid => {
diff --git a/src/site/xdoc/server/config-jmap.xml 
b/src/site/xdoc/server/config-jmap.xml
index 23cea6c482..d92f30ac9d 100644
--- a/src/site/xdoc/server/config-jmap.xml
+++ b/src/site/xdoc/server/config-jmap.xml
@@ -66,6 +66,11 @@
                     <dd>Optional. URL for JMAP WebSocket route</dd>
                     <dd>Default value: ws://localhost</dd>
 
+                    <dt><strong>websocket.ping.interval</strong></dt>
+                    <dd>Optional. Configure the duration of the interval 
between consecutive ping messages (as specified in RFC6455) sent by the server 
to the client over a WebSocket connection.
+                        The supported unit is seconds (e.g: `3s` for a 
3-second interval)</dd>
+                    <dd>Default is empty, this feature is disabled.</dd>
+
                     <dt><strong>upload.max.size</strong></dt>
                     <dd>Optional. Configuration max size for each upload file 
in new JMAP-RFC-8621.</dd>
                     <dd>Default value: 30M. Supported units are B (bytes) K 
(KB) M (MB) G (GB).</dd>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to