This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a84e855782044b1e741ff650bebdbc8551f378f6 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Wed Mar 11 13:41:55 2020 +0700 JAMES-3078 Download routes and tests --- .../java/org/apache/james/util/ReactorUtils.java | 16 ++ .../org/apache/james/util/ReactorUtilsTest.java | 91 ++++++++- .../integration/cucumber/DownloadStepdefs.java | 2 +- .../james/jmap/draft/utils/DownloadPath.java | 25 +-- .../org/apache/james/jmap/http/DownloadRoutes.java | 217 +++++++++++++++++++++ .../apache/james/jmap/http/DownloadRoutesTest.java | 58 ++++++ 6 files changed, 384 insertions(+), 25 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index dd9ceba..a87ab17 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -37,6 +37,22 @@ public class ReactorUtils { return new StreamInputStream(byteArrays.toIterable(1).iterator()); } + public static Flux<ByteBuffer> toChunks(InputStream inputStream, int bufferSize) { + return Flux.<ByteBuffer>generate(sink -> { + try { + byte[] buffer = new byte[bufferSize]; + int read = inputStream.read(buffer); + if (read >= 0) { + sink.next(ByteBuffer.wrap(buffer, 0, read)); + } else { + sink.complete(); + } + } catch (IOException e) { + sink.error(e); + } + }).defaultIfEmpty(ByteBuffer.wrap(new byte[0])); + } + private static class StreamInputStream extends InputStream { private static final int NO_MORE_DATA = -1; diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index c01d09a..06edbc5 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; @@ -32,12 +33,15 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; class ReactorUtilsTest { + static final int BUFFER_SIZE = 5; @Nested class ExecuteAndEmpty { @@ -86,7 +90,7 @@ class ReactorUtilsTest { class ToInputStream { @Test - void givenAFluxOf3BytesShouldReadSuccessfullyTheWholeSource() throws IOException, InterruptedException { + void givenAFluxOf3BytesShouldReadSuccessfullyTheWholeSource() { byte[] bytes = "foo bar ...".getBytes(StandardCharsets.US_ASCII); Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) @@ -101,7 +105,7 @@ class ReactorUtilsTest { } @Test - void givenALongFluxBytesShouldReadSuccessfullyTheWholeSource() throws IOException, InterruptedException { + void givenALongFluxBytesShouldReadSuccessfullyTheWholeSource() { byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) @@ -156,7 +160,7 @@ class ReactorUtilsTest { } @Test - void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { + void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.just( new byte[] {0, 1, 2}, @@ -195,4 +199,85 @@ class ReactorUtilsTest { assertThat(generateElements.get()).isEqualTo(1); } } + + @Nested + class ToChunks { + @Test + void givenInputStreamSmallerThanBufferSizeShouldReturnOneChunk() { + byte[] bytes = "foo".getBytes(StandardCharsets.UTF_8); + InputStream source = new ByteArrayInputStream(bytes); + + List<ByteBuffer> expected = ImmutableList.of(ByteBuffer.wrap(bytes)); + + List<ByteBuffer> chunks = ReactorUtils.toChunks(source, BUFFER_SIZE) + .collectList() + .block(); + + assertThat(chunks).isEqualTo(expected); + } + + @Test + void givenInputStreamEqualToBufferSizeShouldReturnOneChunk() { + byte[] bytes = "foooo".getBytes(StandardCharsets.UTF_8); + InputStream source = new ByteArrayInputStream(bytes); + + List<ByteBuffer> expected = ImmutableList.of(ByteBuffer.wrap(bytes)); + + List<ByteBuffer> chunks = ReactorUtils.toChunks(source, BUFFER_SIZE) + .collectList() + .block(); + + assertThat(chunks).isEqualTo(expected); + } + + @Test + void givenInputStreamSlightlyBiggerThanBufferSizeShouldReturnTwoChunks() { + byte[] bytes = "foobar...".getBytes(StandardCharsets.UTF_8); + InputStream source = new ByteArrayInputStream(bytes); + + List<ByteBuffer> expected = ImmutableList.of( + ByteBuffer.wrap("fooba".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("r...".getBytes(StandardCharsets.UTF_8))); + + List<ByteBuffer> chunks = ReactorUtils.toChunks(source, BUFFER_SIZE) + .collectList() + .block(); + + assertThat(chunks).isEqualTo(expected); + } + + @Test + void givenInputStreamBiggerThanBufferSizeShouldReturnMultipleChunks() { + byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.UTF_8); + InputStream source = new ByteArrayInputStream(bytes); + + List<ByteBuffer> expected = Flux.fromIterable(Bytes.asList(bytes)) + .window(BUFFER_SIZE) + .flatMapSequential(Flux::collectList) + .map(Bytes::toArray) + .map(ByteBuffer::wrap) + .collectList() + .block(); + + List<ByteBuffer> chunks = ReactorUtils.toChunks(source, BUFFER_SIZE) + .collectList() + .block(); + + assertThat(chunks).isEqualTo(expected); + } + + @Test + void givenEmptyInputStreamShouldReturnEmptyChunk() { + byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + InputStream source = new ByteArrayInputStream(bytes); + + List<ByteBuffer> chunks = ReactorUtils.toChunks(source, BUFFER_SIZE) + .collectList() + .block(); + + List<ByteBuffer> expected = ImmutableList.of(ByteBuffer.wrap(bytes)); + + assertThat(chunks).isEqualTo(expected); + } + } } diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/cucumber/DownloadStepdefs.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/cucumber/DownloadStepdefs.java index 9413cbb..961ada7 100644 --- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/cucumber/DownloadStepdefs.java +++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/cucumber/DownloadStepdefs.java @@ -450,7 +450,7 @@ public class DownloadStepdefs { @Then("^the user should receive an attachment access token$") public void accessTokenResponse() throws Throwable { assertThat(response.getStatusLine().getStatusCode()).isEqualTo(200); - assertThat(response.getHeaders("Content-Type")).extracting(Header::toString).containsExactly("Content-Type: text/plain"); + assertThat(response.getHeaders("Content-Type")).extracting(Header::getValue).containsExactly("text/plain"); assertThat(IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)).isNotEmpty(); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/utils/DownloadPath.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/utils/DownloadPath.java index ac4ad2c..29a4e61 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/utils/DownloadPath.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/utils/DownloadPath.java @@ -19,32 +19,15 @@ package org.apache.james.jmap.draft.utils; -import java.util.List; import java.util.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; -import com.google.common.collect.Iterables; - public class DownloadPath { - - public static DownloadPath from(String path) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(path), "'path' is mandatory"); - - // path = /blobId/name - // idx = 0 1 2 - List<String> pathVariables = Splitter.on('/').splitToList(path); - Preconditions.checkArgument(pathVariables.size() >= 1 && pathVariables.size() <= 3, "'blobId' is mandatory"); - - String blobId = Iterables.get(pathVariables, 1, null); - Preconditions.checkArgument(!Strings.isNullOrEmpty(blobId), "'blobId' is mandatory"); - - return new DownloadPath(blobId, name(pathVariables)); + public static DownloadPath ofBlobId(String blobId) { + return new DownloadPath(blobId, Optional.empty()); } - private static Optional<String> name(List<String> pathVariables) { - return Optional.ofNullable(Strings.emptyToNull(Iterables.get(pathVariables, 2, null))); + public static DownloadPath of(String blobId, String name) { + return new DownloadPath(blobId, Optional.of(name)); } private final String blobId; diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java new file mode 100644 index 0000000..b9ff099 --- /dev/null +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java @@ -0,0 +1,217 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.jmap.http; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.james.jmap.HttpConstants.TEXT_PLAIN_CONTENT_TYPE; +import static org.apache.james.jmap.http.JMAPUrls.DOWNLOAD; + +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.jmap.JMAPRoutes; +import org.apache.james.jmap.draft.api.SimpleTokenFactory; +import org.apache.james.jmap.draft.exceptions.BadRequestException; +import org.apache.james.jmap.draft.exceptions.InternalErrorException; +import org.apache.james.jmap.draft.exceptions.UnauthorizedException; +import org.apache.james.jmap.draft.model.AttachmentAccessToken; +import org.apache.james.jmap.draft.utils.DownloadPath; +import org.apache.james.mailbox.BlobManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.exception.BlobNotFoundException; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.Blob; +import org.apache.james.mailbox.model.BlobId; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.mime4j.codec.EncoderUtil; +import org.apache.james.mime4j.codec.EncoderUtil.Usage; +import org.apache.james.util.ReactorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CharMatcher; + +import io.netty.buffer.Unpooled; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; +import reactor.netty.http.server.HttpServerRoutes; + +public class DownloadRoutes implements JMAPRoutes { + private static final Logger LOGGER = LoggerFactory.getLogger(DownloadRoutes.class); + static final String BLOB_ID_PATH_PARAM = "blobId"; + private static final String NAME_PATH_PARAM = "name"; + private static final String DOWNLOAD_FROM_ID = String.format("%s/{%s}", DOWNLOAD, BLOB_ID_PATH_PARAM); + private static final String DOWNLOAD_FROM_ID_AND_NAME = String.format("%s/{%s}/{%s}", DOWNLOAD, BLOB_ID_PATH_PARAM, NAME_PATH_PARAM); + private static final int BUFFER_SIZE = 16 * 1024; + + private final BlobManager blobManager; + private final SimpleTokenFactory simpleTokenFactory; + private final MetricFactory metricFactory; + private final AuthenticationReactiveFilter authenticationReactiveFilter; + + @Inject + @VisibleForTesting + DownloadRoutes(BlobManager blobManager, SimpleTokenFactory simpleTokenFactory, MetricFactory metricFactory, AuthenticationReactiveFilter authenticationReactiveFilter) { + this.blobManager = blobManager; + this.simpleTokenFactory = simpleTokenFactory; + this.metricFactory = metricFactory; + this.authenticationReactiveFilter = authenticationReactiveFilter; + } + + @Override + public Logger logger() { + return LOGGER; + } + + @Override + public HttpServerRoutes define(HttpServerRoutes builder) { + return builder.post(DOWNLOAD_FROM_ID, this::postFromId) + .get(DOWNLOAD_FROM_ID, this::getFromId) + .post(DOWNLOAD_FROM_ID_AND_NAME, this::postFromIdAndName) + .get(DOWNLOAD_FROM_ID_AND_NAME, this::getFromIdAndName) + .options(DOWNLOAD_FROM_ID, CORS_CONTROL) + .options(DOWNLOAD_FROM_ID_AND_NAME, CORS_CONTROL); + } + + private Mono<Void> postFromId(HttpServerRequest request, HttpServerResponse response) { + String blobId = request.param(BLOB_ID_PATH_PARAM); + DownloadPath downloadPath = DownloadPath.ofBlobId(blobId); + return post(request, response, downloadPath); + } + + private Mono<Void> postFromIdAndName(HttpServerRequest request, HttpServerResponse response) { + String blobId = request.param(BLOB_ID_PATH_PARAM); + String name = request.param(NAME_PATH_PARAM); + DownloadPath downloadPath = DownloadPath.of(blobId, name); + return post(request, response, downloadPath); + } + + private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) { + return authenticationReactiveFilter.authenticate(request) + .flatMap(session -> Mono.from(metricFactory.runPublishingTimerMetric("JMAP-download-post", + respondAttachmentAccessToken(session, downloadPath, response))) + .onErrorResume(InternalErrorException.class, e -> handleInternalError(response, e)) + .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, e))) + .subscribeOn(Schedulers.elastic()); + } + + private Mono<Void> getFromId(HttpServerRequest request, HttpServerResponse response) { + String blobId = request.param(BLOB_ID_PATH_PARAM); + DownloadPath downloadPath = DownloadPath.ofBlobId(blobId); + return get(request, response, downloadPath); + } + + private Mono<Void> getFromIdAndName(HttpServerRequest request, HttpServerResponse response) { + String blobId = request.param(BLOB_ID_PATH_PARAM); + try { + String name = URLDecoder.decode(request.param(NAME_PATH_PARAM), StandardCharsets.UTF_8.toString()); + DownloadPath downloadPath = DownloadPath.of(blobId, name); + return get(request, response, downloadPath); + } catch (UnsupportedEncodingException e) { + throw new BadRequestException("Wrong url encoding", e); + } + } + + private Mono<Void> get(HttpServerRequest request, HttpServerResponse response, DownloadPath downloadPath) { + return authenticationReactiveFilter.authenticate(request) + .flatMap(session -> Mono.from(metricFactory.runPublishingTimerMetric("JMAP-download-get", + download(session, downloadPath, response))) + .onErrorResume(InternalErrorException.class, e -> handleInternalError(response, e))) + .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, e)) + .subscribeOn(Schedulers.elastic()); + } + + private Mono<Void> respondAttachmentAccessToken(MailboxSession mailboxSession, DownloadPath downloadPath, HttpServerResponse resp) { + String blobId = downloadPath.getBlobId(); + try { + if (!attachmentExists(mailboxSession, blobId)) { + return resp.status(NOT_FOUND).send(); + } + AttachmentAccessToken attachmentAccessToken = simpleTokenFactory.generateAttachmentAccessToken(mailboxSession.getUser().asString(), blobId); + return resp.header(CONTENT_TYPE, TEXT_PLAIN_CONTENT_TYPE) + .status(OK) + .sendString(Mono.just(attachmentAccessToken.serialize())) + .then(); + } catch (MailboxException e) { + throw new InternalErrorException("Error while asking attachment access token", e); + } + } + + private boolean attachmentExists(MailboxSession mailboxSession, String blobId) throws MailboxException { + try { + blobManager.retrieve(BlobId.fromString(blobId), mailboxSession); + return true; + } catch (BlobNotFoundException e) { + return false; + } + } + + @VisibleForTesting + Mono<Void> download(MailboxSession mailboxSession, DownloadPath downloadPath, HttpServerResponse response) { + String blobId = downloadPath.getBlobId(); + try { + Blob blob = blobManager.retrieve(BlobId.fromString(blobId), mailboxSession); + + return Mono.usingWhen( + Mono.fromCallable(blob::getStream), + stream -> downloadBlob(downloadPath.getName(), response, blob.getSize(), blob.getContentType(), stream), + stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow()) + ); + } catch (BlobNotFoundException e) { + LOGGER.info("Attachment '{}' not found", blobId, e); + return response.status(NOT_FOUND).send(); + } catch (MailboxException e) { + throw new InternalErrorException("Error while downloading", e); + } + } + + private Mono<Void> downloadBlob(Optional<String> optionalName, HttpServerResponse response, long blobSize, String blobContentType, InputStream stream) { + return addContentDispositionHeader(optionalName, response) + .header("Content-Length", String.valueOf(blobSize)) + .header(CONTENT_TYPE, blobContentType) + .status(OK) + .send(ReactorUtils.toChunks(stream, BUFFER_SIZE) + .map(Unpooled::wrappedBuffer)) + .then(); + } + + private HttpServerResponse addContentDispositionHeader(Optional<String> optionalName, HttpServerResponse resp) { + return optionalName.map(name -> addContentDispositionHeaderRegardingEncoding(name, resp)) + .orElse(resp); + } + + private HttpServerResponse addContentDispositionHeaderRegardingEncoding(String name, HttpServerResponse resp) { + if (CharMatcher.ascii().matchesAllOf(name)) { + return resp.header("Content-Disposition", "attachment; filename=\"" + name + "\""); + } else { + return resp.header("Content-Disposition", "attachment; filename*=\"" + EncoderUtil.encodeEncodedWord(name, Usage.TEXT_TOKEN) + "\""); + } + } +} diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/DownloadRoutesTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/DownloadRoutesTest.java new file mode 100644 index 0000000..a1786c8 --- /dev/null +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/http/DownloadRoutesTest.java @@ -0,0 +1,58 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.http; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.james.core.Username; +import org.apache.james.jmap.draft.api.SimpleTokenFactory; +import org.apache.james.jmap.draft.exceptions.InternalErrorException; +import org.apache.james.jmap.draft.utils.DownloadPath; +import org.apache.james.mailbox.BlobManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MailboxSessionUtil; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.metrics.tests.RecordingMetricFactory; +import org.junit.Test; + +import reactor.netty.http.server.HttpServerResponse; + +public class DownloadRoutesTest { + + @Test + public void downloadShouldFailWhenUnknownErrorOnAttachmentManager() throws Exception { + MailboxSession mailboxSession = MailboxSessionUtil.create(Username.of("User")); + BlobManager mockedBlobManager = mock(BlobManager.class); + when(mockedBlobManager.retrieve(any(), eq(mailboxSession))) + .thenThrow(new MailboxException()); + AuthenticationReactiveFilter mockedAuthFilter = mock(AuthenticationReactiveFilter.class); + SimpleTokenFactory nullSimpleTokenFactory = null; + + DownloadRoutes testee = new DownloadRoutes(mockedBlobManager, nullSimpleTokenFactory, new RecordingMetricFactory(), mockedAuthFilter); + + HttpServerResponse resp = mock(HttpServerResponse.class); + assertThatThrownBy(() -> testee.download(mailboxSession, DownloadPath.ofBlobId("blobId"), resp).block()) + .isInstanceOf(InternalErrorException.class); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org