This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 06eee5f435aa2fd45136c5ff2700c98283557cd7
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri Nov 26 17:39:42 2021 +0700

    JAMES-3676 Add more tests for ReactorUtils.toInputStream
---
 .../org/apache/james/util/ReactorUtilsTest.java    | 83 +++++++++++++++++++++-
 1 file changed, 82 insertions(+), 1 deletion(-)

diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 40a2b74..ba469da 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -475,6 +475,25 @@ class ReactorUtilsTest {
             assertThat(inputStream).hasSameContentAs(new 
ByteArrayInputStream(bytes));
         }
 
+
+        @Test
+        void 
givenALongFluxBytesWhenIDoNotReadItBeforeClosingItThenTheOriginalFluxShouldBeDisposed()
 throws Exception {
+            byte[] bytes = 
RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
+
+            AtomicBoolean canceled = new AtomicBoolean(false);
+            Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
+                .window(3)
+                .flatMapSequential(Flux::collectList, 1, 1)
+                .map(Bytes::toArray)
+                .map(ByteBuffer::wrap)
+                .doOnCancel(() -> canceled.set(true));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            inputStream.close();
+
+            assertThat(canceled.get()).isTrue();
+        }
+
         @Test
         void 
givenALongFluxBytesWhenIReadItPartiallyBeforeClosingItThenTheOriginalFluxShouldBeDisposed()
 throws Exception {
             byte[] bytes = 
RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
@@ -482,18 +501,80 @@ class ReactorUtilsTest {
             AtomicBoolean canceled = new AtomicBoolean(false);
             Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
                 .window(3)
-                .flatMapSequential(Flux::collectList)
+                .flatMapSequential(Flux::collectList, 1, 1)
                 .map(Bytes::toArray)
                 .map(ByteBuffer::wrap)
                 .doOnCancel(() -> canceled.set(true));
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] buffer = new byte[3];
+            inputStream.read(buffer);
             inputStream.close();
 
             assertThat(canceled.get()).isTrue();
         }
 
         @Test
+        void 
givenALongFluxBytesWhenIReadItFullyWithoutClosingItThenTheOriginalFluxShouldBeDisposed()
 throws Exception {
+            byte[] bytes = 
RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
+
+            AtomicBoolean canceled = new AtomicBoolean(false);
+            Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
+                .window(3)
+                .flatMapSequential(Flux::collectList, 1, 1)
+                .map(Bytes::toArray)
+                .map(ByteBuffer::wrap)
+                .doFinally(any -> canceled.set(true));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            IOUtils.readFully(inputStream, 41111);
+            // do not close it
+            assertThat(canceled.get()).isTrue();
+        }
+
+        @Test
+        void exceptionsShouldCancelOriginalFluxSubscription() {
+            AtomicBoolean canceled = new AtomicBoolean(false);
+            Flux<ByteBuffer> source = Flux.fromIterable(ImmutableList.of(
+                Mono.just("abc"), Mono.just("def"),
+                Mono.<String>error(new RuntimeException("Dummy")),
+                Mono.just("mno")))
+                .doFinally(any -> canceled.set(true))
+                .concatMap(s -> s, 1)
+                .map(String::getBytes)
+                .map(ByteBuffer::wrap);
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+
+            try {
+                byte[] buffer = new byte[3];
+                inputStream.read(buffer);
+                inputStream.read(buffer);
+                inputStream.read(buffer);
+            } catch (Exception e) {
+                // expected
+            }
+
+            assertThat(canceled.get()).isTrue();
+        }
+
+        @Test
+        void exceptionsShouldBePropagated() {
+            Flux<ByteBuffer> source = Flux.fromIterable(ImmutableList.of(
+                Mono.just("abc"), Mono.just("def"), Mono.just("ghi"), 
Mono.just("jkl"),
+                Mono.<String>error(new RuntimeException("Dummy")), 
Mono.just("mno")))
+                .concatMap(s -> s, 1)
+                .map(String::getBytes)
+                .map(ByteBuffer::wrap);
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+
+            assertThatThrownBy(() -> IOUtils.toByteArray(inputStream))
+                .hasMessage("Dummy");
+        }
+
+
+        @Test
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.range(0, 10)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to