This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 89d5662490491fb5c33d7cb6b0f73fec0f895713 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Nov 25 14:21:16 2021 +0700 JAMES-3676 ReactorUtils.toInputStream should cancel publisher subscriptions upon partial reads --- .../main/java/org/apache/james/util/ReactorUtils.java | 13 ++++++++++--- .../java/org/apache/james/util/ReactorUtilsTest.java | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 2d2b5a3..9e64391 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -89,7 +90,7 @@ public class ReactorUtils { } public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) { - return new StreamInputStream(byteArrays.toIterable(1).iterator()); + return new StreamInputStream(byteArrays.toStream(1)); } public static Flux<ByteBuffer> toChunks(InputStream inputStream, int bufferSize) { @@ -112,10 +113,12 @@ public class ReactorUtils { private static final int NO_MORE_DATA = -1; private final Iterator<ByteBuffer> source; + private final Stream<ByteBuffer> sourceAsStream; private Optional<ByteBuffer> currentItemByteStream; - StreamInputStream(Iterator<ByteBuffer> source) { - this.source = source; + StreamInputStream(Stream<ByteBuffer> source) { + this.source = source.iterator(); + this.sourceAsStream = source; this.currentItemByteStream = Optional.empty(); } @@ -150,6 +153,10 @@ public class ReactorUtils { return currentItemByteStream; } + @Override + public void close() throws IOException { + sourceAsStream.close(); + } } private static int byteToInt(ByteBuffer buffer) { diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 9b1a30c..40a2b74 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -32,6 +32,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -475,6 +476,24 @@ class ReactorUtilsTest { } @Test + void givenALongFluxBytesWhenIReadItPartiallyBeforeClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception { + byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); + + AtomicBoolean canceled = new AtomicBoolean(false); + Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) + .window(3) + .flatMapSequential(Flux::collectList) + .map(Bytes::toArray) + .map(ByteBuffer::wrap) + .doOnCancel(() -> canceled.set(true)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + inputStream.close(); + + assertThat(canceled.get()).isTrue(); + } + + @Test void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.range(0, 10) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
