This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 57acf284e6264b0cf3078f8cb2370118602bb8bf Author: Benoit Tellier <[email protected]> AuthorDate: Thu Jan 28 11:41:35 2021 +0700 JAMES-3491 Implement WebSocket routes --- .../james/jmap/rfc8621/RFC8621MethodsModule.java | 9 +- .../james/jmap/json/ResponseSerializer.scala | 11 +- .../apache/james/jmap/routes/WebSocketRoutes.scala | 145 +++++++++++++++++++++ .../main/java/org/apache/james/jmap/JMAPUrls.java | 1 + 4 files changed, 158 insertions(+), 8 deletions(-) diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java index 4de781d..530c7ec 100644 --- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java +++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java @@ -54,6 +54,7 @@ import org.apache.james.jmap.routes.DownloadRoutes; import org.apache.james.jmap.routes.JMAPApiRoutes; import org.apache.james.jmap.routes.SessionRoutes; import org.apache.james.jmap.routes.UploadRoutes; +import org.apache.james.jmap.routes.WebSocketRoutes; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; @@ -97,8 +98,12 @@ public class RFC8621MethodsModule extends AbstractModule { } @ProvidesIntoSet - JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) { - return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes); + JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, + JMAPApiRoutes jmapApiRoutes, + DownloadRoutes downloadRoutes, + UploadRoutes uploadRoutes, + WebSocketRoutes webSocketRoutes) { + return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes, webSocketRoutes); } @Provides diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala index edace41..02ca667 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala @@ -202,6 +202,10 @@ object ResponseSerializer { "requestId" -> error.requestId.map(_.value).map(JsString).getOrElse(JsNull)) ++ errorJson.value) } + private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = { + case response: WebSocketResponse => webSocketResponseWrites.writes(response) + case error: WebSocketError => webSocketErrorWrites.writes(error) + } def serialize(session: Session): JsValue = Json.toJson(session) @@ -213,12 +217,7 @@ object ResponseSerializer { def serialize(errors: JsError): JsValue = Json.toJson(errors) - def serialize(response: WebSocketOutboundMessage): JsValue = { - case response: WebSocketResponse => Json.toJson(response) - case error: WebSocketError => Json.toJson(error) - } - - def serialize(errors: WebSocketError): JsValue = Json.toJson(errors) + def serialize(outboundMessage: WebSocketOutboundMessage): JsValue = Json.toJson(outboundMessage) def deserializeRequestObject(input: String): JsResult[RequestObject] = Json.parse(input).validate[RequestObject] diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala new file mode 100644 index 0000000..c4b2900 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala @@ -0,0 +1,145 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.routes + +import com.fasterxml.jackson.core.JsonParseException +import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE +import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, INTERNAL_SERVER_ERROR, UNAUTHORIZED} +import io.netty.handler.codec.http.websocketx.WebSocketFrame +import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus} +import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE +import org.apache.james.jmap.JMAPUrls.JMAP_WS +import org.apache.james.jmap.core.ProblemDetails.{notJSONProblem, notRequestProblem, unknownCapabilityProblem} +import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketRequest, WebSocketResponse} +import org.apache.james.jmap.exceptions.UnauthorizedException +import org.apache.james.jmap.http.rfc8621.InjectionKeys +import org.apache.james.jmap.http.{Authenticator, UserProvisioning} +import org.apache.james.jmap.json.ResponseSerializer +import org.apache.james.jmap.routes.WebSocketRoutes.LOGGER +import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} +import org.apache.james.mailbox.MailboxSession +import org.slf4j.{Logger, LoggerFactory} +import reactor.core.publisher.Mono +import reactor.core.scala.publisher.{SFlux, SMono} +import reactor.core.scheduler.Schedulers +import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} +import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound} + +import java.nio.charset.StandardCharsets +import java.util.stream +import javax.inject.{Inject, Named} + +object WebSocketRoutes { + val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes]) +} + +class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, + userProvisioner: UserProvisioning, + jmapApi: JMAPApi) extends JMAPRoutes { + + override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of( + JMAPRoute.builder + .endpoint(new Endpoint(HttpMethod.GET, JMAP_WS)) + .action(this.handleWebSockets) + .corsHeaders, + JMAPRoute.builder + .endpoint(new Endpoint(HttpMethod.OPTIONS, JMAP_WS)) + .action(JMAPRoutes.CORS_CONTROL) + .corsHeaders()) + + private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = { + SMono(authenticator.authenticate(httpServerRequest)) + .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession) + .`then` + .`then`(SMono(httpServerResponse.sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out))))) + .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse)) + .subscribeOn(Schedulers.elastic) + .asJava() + .`then`() + } + + private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = + SFlux[WebSocketFrame](in.aggregateFrames() + .receiveFrames()) + .map(frame => { + val bytes = new Array[Byte](frame.content().readableBytes) + frame.content().readBytes(bytes) + new String(bytes, StandardCharsets.UTF_8) + }) + .flatMap(handleClientMessages(session)) + .onErrorResume(e => SMono.just(asError(None)(e))) + .map(ResponseSerializer.serialize) + .map(_.toString) + .flatMap(response => out.sendString(SMono.just(response), StandardCharsets.UTF_8)) + .onErrorResume(e => { + e.printStackTrace() + SMono.empty + }) + .`then`() + .asJava() + .`then`() + + private def handleClientMessages(session: MailboxSession)(message: String): SMono[WebSocketOutboundMessage] = + ResponseSerializer.deserializeWebSocketInboundMessage(message) + .fold(invalid => { + val error = asError(None)(new IllegalArgumentException(invalid.toString())) + SMono.just[WebSocketOutboundMessage](error) + }, { + case request: WebSocketRequest => + jmapApi.process(request.requestObject, session) + .map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _)) + .onErrorResume(e => SMono.just(asError(request.requestId)(e))) + .subscribeOn(Schedulers.elastic) + }) + + private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = throwable match { + case e: UnauthorizedException => + LOGGER.warn("Unauthorized", e) + respondDetails(response, + ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage), + UNAUTHORIZED) + case e => + LOGGER.error("Unexpected error upon WebSocket handshake request", e) + respondDetails(response, + ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage), + INTERNAL_SERVER_ERROR) + } + + private def asError(requestId: Option[RequestId])(throwable: Throwable): WebSocketError = throwable match { + case exception: IllegalArgumentException => + WebSocketError(requestId, notRequestProblem( + s"The request was successfully parsed as JSON but did not match the type signature of the Request object: ${exception.getMessage}")) + case exception: JsonParseException => + WebSocketError(requestId, notJSONProblem( + s"The content type of the request was not application/json or the request did not parse as I-JSON: ${exception.getMessage}")) + case exception: UnsupportedCapabilitiesException => + WebSocketError(requestId, unknownCapabilityProblem(s"The request used unsupported capabilities: ${exception.capabilities}")) + case e => + LOGGER.error("Unexpected error upon API request", e) + WebSocketError(requestId, ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage)) + } + + private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Void] = + SMono.fromPublisher(httpServerResponse.status(statusCode) + .header(CONTENT_TYPE, JSON_CONTENT_TYPE) + .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString), + StandardCharsets.UTF_8) + .`then`) +} diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java index f7256c7..6148383 100644 --- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java +++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java @@ -21,6 +21,7 @@ package org.apache.james.jmap; public interface JMAPUrls { String JMAP = "/jmap"; + String JMAP_WS = "/jmap/ws"; String AUTHENTICATION = "/authentication"; String DOWNLOAD = "/download"; String UPLOAD = "/upload"; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
