This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 06eee5f435aa2fd45136c5ff2700c98283557cd7 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Nov 26 17:39:42 2021 +0700 JAMES-3676 Add more tests for ReactorUtils.toInputStream --- .../org/apache/james/util/ReactorUtilsTest.java | 83 +++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 40a2b74..ba469da 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -475,6 +475,25 @@ class ReactorUtilsTest { assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(bytes)); } + + @Test + void givenALongFluxBytesWhenIDoNotReadItBeforeClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception { + byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); + + AtomicBoolean canceled = new AtomicBoolean(false); + Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) + .window(3) + .flatMapSequential(Flux::collectList, 1, 1) + .map(Bytes::toArray) + .map(ByteBuffer::wrap) + .doOnCancel(() -> canceled.set(true)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + inputStream.close(); + + assertThat(canceled.get()).isTrue(); + } + @Test void givenALongFluxBytesWhenIReadItPartiallyBeforeClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception { byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); @@ -482,18 +501,80 @@ class ReactorUtilsTest { AtomicBoolean canceled = new AtomicBoolean(false); Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) .window(3) - .flatMapSequential(Flux::collectList) + .flatMapSequential(Flux::collectList, 1, 1) .map(Bytes::toArray) .map(ByteBuffer::wrap) .doOnCancel(() -> canceled.set(true)); InputStream inputStream = ReactorUtils.toInputStream(source); + byte[] buffer = new byte[3]; + inputStream.read(buffer); inputStream.close(); assertThat(canceled.get()).isTrue(); } @Test + void givenALongFluxBytesWhenIReadItFullyWithoutClosingItThenTheOriginalFluxShouldBeDisposed() throws Exception { + byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); + + AtomicBoolean canceled = new AtomicBoolean(false); + Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) + .window(3) + .flatMapSequential(Flux::collectList, 1, 1) + .map(Bytes::toArray) + .map(ByteBuffer::wrap) + .doFinally(any -> canceled.set(true)); + + InputStream inputStream = ReactorUtils.toInputStream(source); + IOUtils.readFully(inputStream, 41111); + // do not close it + assertThat(canceled.get()).isTrue(); + } + + @Test + void exceptionsShouldCancelOriginalFluxSubscription() { + AtomicBoolean canceled = new AtomicBoolean(false); + Flux<ByteBuffer> source = Flux.fromIterable(ImmutableList.of( + Mono.just("abc"), Mono.just("def"), + Mono.<String>error(new RuntimeException("Dummy")), + Mono.just("mno"))) + .doFinally(any -> canceled.set(true)) + .concatMap(s -> s, 1) + .map(String::getBytes) + .map(ByteBuffer::wrap); + + InputStream inputStream = ReactorUtils.toInputStream(source); + + try { + byte[] buffer = new byte[3]; + inputStream.read(buffer); + inputStream.read(buffer); + inputStream.read(buffer); + } catch (Exception e) { + // expected + } + + assertThat(canceled.get()).isTrue(); + } + + @Test + void exceptionsShouldBePropagated() { + Flux<ByteBuffer> source = Flux.fromIterable(ImmutableList.of( + Mono.just("abc"), Mono.just("def"), Mono.just("ghi"), Mono.just("jkl"), + Mono.<String>error(new RuntimeException("Dummy")), Mono.just("mno"))) + .concatMap(s -> s, 1) + .map(String::getBytes) + .map(ByteBuffer::wrap); + + InputStream inputStream = ReactorUtils.toInputStream(source); + + assertThatThrownBy(() -> IOUtils.toByteArray(inputStream)) + .hasMessage("Dummy"); + } + + + @Test void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); Flux<ByteBuffer> source = Flux.range(0, 10) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
