This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4ccb9b1636e5d6043ab37b31acca1c1bd16ce977 Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Wed Jul 31 13:15:19 2019 +0200 JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy InputStream --- .../java/org/apache/james/util/ReactorUtils.java | 70 ++++++++++++++++++ .../org/apache/james/util/ReactorUtilsTest.java | 82 +++++++++++++++++++++- 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 42e6d8e..1ed7963 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -18,10 +18,80 @@ ****************************************************************/ package org.apache.james.util; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.Spliterator; +import java.util.stream.Stream; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class ReactorUtils { public static <T> Mono<T> executeAndEmpty(Runnable runnable) { return Mono.fromRunnable(runnable).then(Mono.empty()); } + + public static InputStream toInputStream(Flux<byte[]> byteArrays) { + return new StreamInputStream(byteArrays.toStream(1)); + } + + private static class StreamInputStream extends InputStream { + private static final int NO_MORE_DATA = -1; + + private final Stream<byte[]> source; + private final Spliterator<byte[]> spliterator; + private Optional<ByteArrayInputStream> currentItemByteStream; + + StreamInputStream(Stream<byte[]> source) { + this.source = source; + this.spliterator = source.spliterator(); + this.currentItemByteStream = Optional.empty(); + } + + @Override + public int read() { + try { + if (!dataAvailableToRead()) { + switchToNextChunk(); + } + + if (!dataAvailableToRead()) { + source.close(); + return NO_MORE_DATA; + } + + return currentItemByteStream.map(ByteArrayInputStream::read) + .filter(readResult -> readResult != NO_MORE_DATA) + .orElseGet(this::readNextChunk); + } catch (Throwable t) { + source.close(); + throw t; + } + } + + private boolean dataAvailableToRead() { + return currentItemByteStream.isPresent(); + } + + private void switchToNextChunk() { + spliterator.tryAdvance(bytes -> + currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes))); + } + + private Integer readNextChunk() { + currentItemByteStream = Optional.empty(); + return read(); + } + + @Override + public void close() throws IOException { + try { + source.close(); + } finally { + super.close(); + } + } + } } diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 3c991e0..98bb9e1 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -20,10 +20,18 @@ package org.apache.james.util; import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; class ReactorUtilsTest { @@ -69,4 +77,76 @@ class ReactorUtilsTest { } } } -} \ No newline at end of file + + @Nested + class ToInputStream { + @Test + void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { + AtomicInteger generateElements = new AtomicInteger(0); + Flux<byte[]> source = Flux.range(0, 10) + .subscribeOn(Schedulers.elastic()) + .limitRate(2) + .doOnRequest(request -> generateElements.getAndAdd((int) request)) + .map(index -> new byte[] {(byte) (int) index}); + + InputStream inputStream = ReactorUtils.toInputStream(source); + byte[] readBytes = new byte[5]; + inputStream.read(readBytes, 0, readBytes.length); + + assertThat(readBytes).contains(0, 1, 2, 3, 4); + Thread.sleep(200); + assertThat(generateElements.get()).isEqualTo(6); + } + + @Test + void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { + AtomicInteger generateElements = new AtomicInteger(0); + Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) + .subscribeOn(Schedulers.elastic()) + .limitRate(2) + .doOnRequest(request -> generateElements.getAndAdd((int) request)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + byte[] readBytes = new byte[5]; + inputStream.read(readBytes, 0, readBytes.length); + + assertThat(readBytes).contains(0, 1, 2, 3, 4); + Thread.sleep(200); + assertThat(generateElements.get()).isEqualTo(3); + } + + @Test + void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { + AtomicInteger generateElements = new AtomicInteger(0); + Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) + .subscribeOn(Schedulers.elastic()) + .limitRate(2) + .doOnRequest(request -> generateElements.getAndAdd((int) request)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + byte[] readBytes = new byte[5]; + inputStream.read(readBytes, 0, readBytes.length); + + assertThat(readBytes).contains(0, 1, 2, 3, 4); + Thread.sleep(200); + assertThat(generateElements.get()).isEqualTo(4); + } + + @Test + void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException { + AtomicInteger generateElements = new AtomicInteger(0); + Flux<byte[]> source = Flux.<byte[]>empty() + .subscribeOn(Schedulers.elastic()) + .limitRate(2) + .doOnRequest(request -> generateElements.getAndAdd((int) request)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + byte[] readBytes = new byte[5]; + inputStream.read(readBytes, 0, readBytes.length); + + assertThat(readBytes).contains(0, 0, 0, 0, 0); + Thread.sleep(200); + assertThat(generateElements.get()).isEqualTo(1); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org