This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2e5b3ba15d8548c92c55bdaea8cba373fb7b139a Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Wed Mar 11 13:45:46 2020 +0700 JAMES-3078 UploadRoutes --- .../apache/james/mailbox/AttachmentManager.java | 3 +- .../cassandra/mail/CassandraAttachmentMapper.java | 7 +- .../inmemory/mail/InMemoryAttachmentMapper.java | 10 +- .../mailbox/store/StoreAttachmentManager.java | 5 +- .../james/mailbox/store/StoreMessageManager.java | 1 + .../james/mailbox/store/mail/AttachmentMapper.java | 3 +- .../store/mail/model/AttachmentMapperTest.java | 18 +-- .../org/apache/james/jmap/http/UploadRoutes.java | 160 +++++++++++++++++++++ 8 files changed, 188 insertions(+), 19 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java index aabc4ed..83e0094 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java @@ -27,6 +27,7 @@ import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.MessageId; +import org.reactivestreams.Publisher; public interface AttachmentManager { @@ -36,7 +37,7 @@ public interface AttachmentManager { List<Attachment> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException; - void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException; + Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession); void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId, MailboxSession mailboxSession) throws MailboxException; } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index adf1c3f..a2c4bad 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -110,12 +110,11 @@ public class CassandraAttachmentMapper implements AttachmentMapper { } @Override - public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException { - ownerDAO.addOwner(attachment.getAttachmentId(), owner) + public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) { + return ownerDAO.addOwner(attachment.getAttachmentId(), owner) .then(Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST))) .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)) - .flatMap(attachmentDAOV2::storeAttachment) - .block(); + .flatMap(attachmentDAOV2::storeAttachment); } @Override diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java index 54d040c..3267824 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java @@ -38,6 +38,8 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; +import reactor.core.publisher.Mono; + public class InMemoryAttachmentMapper implements AttachmentMapper { private static final int INITIAL_SIZE = 128; @@ -73,9 +75,11 @@ public class InMemoryAttachmentMapper implements AttachmentMapper { } @Override - public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException { - attachmentsById.put(attachment.getAttachmentId(), attachment); - ownersByAttachmentId.put(attachment.getAttachmentId(), owner); + public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) { + return Mono.fromRunnable(() -> { + attachmentsById.put(attachment.getAttachmentId(), attachment); + ownersByAttachmentId.put(attachment.getAttachmentId(), owner); + }); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java index 2c59619..f32d60b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java @@ -34,6 +34,7 @@ import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.mail.AttachmentMapperFactory; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +75,8 @@ public class StoreAttachmentManager implements AttachmentManager { } @Override - public void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException { - attachmentMapperFactory.getAttachmentMapper(mailboxSession) + public Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession) { + return attachmentMapperFactory.getAttachmentMapper(mailboxSession) .storeAttachmentForOwner(attachment, mailboxSession.getUser()); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index 2736d37..f97e5f0 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -465,6 +465,7 @@ public class StoreMessageManager implements MessageManager { .addMetaData(message.metaData()) .build(), new MailboxIdRegistrationKey(mailbox.getMailboxId())) + .subscribeOn(Schedulers.elastic()) .block(); return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid()); }, MailboxPathLocker.LockType.Write); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java index 8b411b3..ced1530 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java @@ -28,6 +28,7 @@ import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.transaction.Mapper; +import org.reactivestreams.Publisher; public interface AttachmentMapper extends Mapper { @@ -35,7 +36,7 @@ public interface AttachmentMapper extends Mapper { List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds); - void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException; + Publisher<Void> storeAttachmentForOwner(Attachment attachment, Username owner); void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId) throws MailboxException; diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java index 97469eb..ad21cf6 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java @@ -37,6 +37,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; + public abstract class AttachmentMapperTest { private static final AttachmentId UNKNOWN_ATTACHMENT_ID = AttachmentId.from("unknown"); private static final Username OWNER = Username.of("owner"); @@ -73,7 +75,7 @@ public abstract class AttachmentMapperTest { .type("content") .build(); AttachmentId attachmentId = expected.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(expected, OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block(); //When Attachment attachment = attachmentMapper.getAttachment(attachmentId); //Then @@ -116,21 +118,21 @@ public abstract class AttachmentMapperTest { } @Test - void getAttachmentsShouldReturnTheAttachmentsWhenSome() throws Exception { + void getAttachmentsShouldReturnTheAttachmentsWhenSome() { //Given Attachment expected = Attachment.builder() .bytes("payload".getBytes(StandardCharsets.UTF_8)) .type("content") .build(); AttachmentId attachmentId = expected.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(expected, OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block(); Attachment expected2 = Attachment.builder() .bytes("payload2".getBytes(StandardCharsets.UTF_8)) .type("content") .build(); AttachmentId attachmentId2 = expected2.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(expected2, OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(expected2, OWNER)).block(); //When List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(attachmentId, attachmentId2)); @@ -153,7 +155,7 @@ public abstract class AttachmentMapperTest { .type("content") .build(); AttachmentId attachmentId = attachment.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(attachment, OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block(); //When Collection<MessageId> messageIds = attachmentMapper.getRelatedMessageIds(attachmentId); @@ -270,7 +272,7 @@ public abstract class AttachmentMapperTest { .build(); AttachmentId attachmentId = attachment.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(attachment, OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block(); //When Collection<Username> expectedOwners = ImmutableList.of(OWNER); @@ -305,8 +307,8 @@ public abstract class AttachmentMapperTest { .build(); AttachmentId attachmentId = attachment.getAttachmentId(); - attachmentMapper.storeAttachmentForOwner(attachment, OWNER); - attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER); + Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block(); + Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER)).block(); //When Collection<Username> expectedOwners = ImmutableList.of(OWNER, ADDITIONAL_OWNER); diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java new file mode 100644 index 0000000..7589cbb --- /dev/null +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java @@ -0,0 +1,160 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.jmap.http; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; +import static org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE_UTF8; +import static org.apache.james.jmap.http.JMAPUrls.UPLOAD; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import javax.inject.Inject; + +import org.apache.james.jmap.JMAPRoutes; +import org.apache.james.jmap.draft.exceptions.BadRequestException; +import org.apache.james.jmap.draft.exceptions.InternalErrorException; +import org.apache.james.jmap.draft.exceptions.UnauthorizedException; +import org.apache.james.jmap.draft.model.UploadResponse; +import org.apache.james.mailbox.AttachmentManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.model.Attachment; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.util.ReactorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.io.ByteStreams; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.HttpServerRoutes; + +public class UploadRoutes implements JMAPRoutes { + private static final Logger LOGGER = LoggerFactory.getLogger(UploadRoutes.class); + + static class CancelledUploadException extends RuntimeException { + + } + + private final MetricFactory metricFactory; + private final AuthenticationReactiveFilter authenticationReactiveFilter; + private final AttachmentManager attachmentManager; + private final ObjectMapper objectMapper; + + @Inject + private UploadRoutes(MetricFactory metricFactory, AuthenticationReactiveFilter authenticationReactiveFilter, AttachmentManager attachmentManager, ObjectMapper objectMapper) { + this.metricFactory = metricFactory; + this.authenticationReactiveFilter = authenticationReactiveFilter; + this.attachmentManager = attachmentManager; + this.objectMapper = objectMapper; + } + + @Override + public Logger logger() { + return LOGGER; + } + + @Override + public HttpServerRoutes define(HttpServerRoutes builder) { + return builder.post(UPLOAD, this::post) + .options(UPLOAD, CORS_CONTROL); + } + + private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) { + String contentType = request.requestHeaders().get(CONTENT_TYPE); + if (Strings.isNullOrEmpty(contentType)) { + return response.status(BAD_REQUEST).send(); + } else { + return authenticationReactiveFilter.authenticate(request) + .flatMap(session -> post(request, response, contentType, session)) + .onErrorResume(CancelledUploadException.class, e -> handleCanceledUpload(response, e)) + .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, e)) + .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, e)) + .onErrorResume(InternalErrorException.class, e -> handleInternalError(response, e)) + .subscribeOn(Schedulers.elastic()); + } + } + + private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, String contentType, MailboxSession session) { + InputStream content = ReactorUtils.toInputStream(request.receive().asByteBuffer()); + return Mono.from(metricFactory.runPublishingTimerMetric("JMAP-upload-post", + handle(contentType, content, session, response))); + } + + private Mono<Void> handle(String contentType, InputStream content, MailboxSession mailboxSession, HttpServerResponse response) { + return uploadContent(contentType, content, mailboxSession) + .flatMap(storedContent -> { + try { + return response.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8) + .status(CREATED) + .sendString(Mono.just(objectMapper.writeValueAsString(storedContent))) + .then(); + } catch (JsonProcessingException e) { + throw new InternalErrorException("Error serializing upload response", e); + } + }); + } + + private Mono<UploadResponse> uploadContent(String contentType, InputStream inputStream, MailboxSession session) { + return toBytesArray(inputStream) + .map(bytes -> Attachment.builder() + .bytes(bytes) + .type(contentType) + .build()) + .flatMap(attachment -> Mono.from(attachmentManager.storeAttachment(attachment, session)) + .thenReturn(UploadResponse.builder() + .blobId(attachment.getAttachmentId().getId()) + .type(attachment.getType()) + .size(attachment.getSize()) + .build())); + } + + private Mono<byte[]> toBytesArray(InputStream inputStream) { + return Mono.fromCallable(() -> { + try { + return ByteStreams.toByteArray(inputStream); + } catch (IOException e) { + if (e instanceof EOFException) { + throw new CancelledUploadException(); + } else { + throw new InternalErrorException("Error while uploading content", e); + } + } + }); + } + + private Mono<Void> handleCanceledUpload(HttpServerResponse response, CancelledUploadException e) { + LOGGER.info("An upload has been canceled before the end", e); + return response.send(); + } + + private Mono<Void> handleBadRequest(HttpServerResponse response, BadRequestException e) { + LOGGER.warn("Invalid authentication request received.", e); + return response.status(BAD_REQUEST).send(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org