This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 89d5662490491fb5c33d7cb6b0f73fec0f895713
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Nov 25 14:21:16 2021 +0700

    JAMES-3676 ReactorUtils.toInputStream should cancel publisher subscriptions 
upon partial reads
---
 .../main/java/org/apache/james/util/ReactorUtils.java | 13 ++++++++++---
 .../java/org/apache/james/util/ReactorUtilsTest.java  | 19 +++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 2d2b5a3..9e64391 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
@@ -89,7 +90,7 @@ public class ReactorUtils {
     }
 
     public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
-        return new StreamInputStream(byteArrays.toIterable(1).iterator());
+        return new StreamInputStream(byteArrays.toStream(1));
     }
 
     public static Flux<ByteBuffer> toChunks(InputStream inputStream, int 
bufferSize) {
@@ -112,10 +113,12 @@ public class ReactorUtils {
         private static final int NO_MORE_DATA = -1;
 
         private final Iterator<ByteBuffer> source;
+        private final Stream<ByteBuffer> sourceAsStream;
         private Optional<ByteBuffer> currentItemByteStream;
 
-        StreamInputStream(Iterator<ByteBuffer> source) {
-            this.source = source;
+        StreamInputStream(Stream<ByteBuffer> source) {
+            this.source = source.iterator();
+            this.sourceAsStream = source;
             this.currentItemByteStream = Optional.empty();
         }
 
@@ -150,6 +153,10 @@ public class ReactorUtils {
             return currentItemByteStream;
         }
 
+        @Override
+        public void close() throws IOException {
+            sourceAsStream.close();
+        }
     }
 
     private static int byteToInt(ByteBuffer buffer) {
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 9b1a30c..40a2b74 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -32,6 +32,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
@@ -475,6 +476,24 @@ class ReactorUtilsTest {
         }
 
         @Test
+        void 
givenALongFluxBytesWhenIReadItPartiallyBeforeClosingItThenTheOriginalFluxShouldBeDisposed()
 throws Exception {
+            byte[] bytes = 
RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
+
+            AtomicBoolean canceled = new AtomicBoolean(false);
+            Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
+                .window(3)
+                .flatMapSequential(Flux::collectList)
+                .map(Bytes::toArray)
+                .map(ByteBuffer::wrap)
+                .doOnCancel(() -> canceled.set(true));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            inputStream.close();
+
+            assertThat(canceled.get()).isTrue();
+        }
+
+        @Test
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
             Flux<ByteBuffer> source = Flux.range(0, 10)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to