This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4ccb9b1636e5d6043ab37b31acca1c1bd16ce977
Author: Gautier DI FOLCO <gdifo...@linagora.com>
AuthorDate: Wed Jul 31 13:15:19 2019 +0200

    JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy 
InputStream
---
 .../java/org/apache/james/util/ReactorUtils.java   | 70 ++++++++++++++++++
 .../org/apache/james/util/ReactorUtilsTest.java    | 82 +++++++++++++++++++++-
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 42e6d8e..1ed7963 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,10 +18,80 @@
  ****************************************************************/
 package org.apache.james.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ReactorUtils {
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
+
+    public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+        return new StreamInputStream(byteArrays.toStream(1));
+    }
+
+    private static  class StreamInputStream extends InputStream {
+        private static final int NO_MORE_DATA = -1;
+
+        private final Stream<byte[]> source;
+        private final Spliterator<byte[]> spliterator;
+        private Optional<ByteArrayInputStream> currentItemByteStream;
+
+        StreamInputStream(Stream<byte[]> source) {
+            this.source = source;
+            this.spliterator = source.spliterator();
+            this.currentItemByteStream = Optional.empty();
+        }
+
+        @Override
+        public int read() {
+            try {
+                if (!dataAvailableToRead()) {
+                    switchToNextChunk();
+                }
+
+                if (!dataAvailableToRead()) {
+                    source.close();
+                    return NO_MORE_DATA;
+                }
+
+                return currentItemByteStream.map(ByteArrayInputStream::read)
+                    .filter(readResult -> readResult != NO_MORE_DATA)
+                    .orElseGet(this::readNextChunk);
+            } catch (Throwable t) {
+                source.close();
+                throw t;
+            }
+        }
+
+        private boolean dataAvailableToRead() {
+            return currentItemByteStream.isPresent();
+        }
+
+        private void switchToNextChunk() {
+            spliterator.tryAdvance(bytes ->
+                currentItemByteStream = Optional.of(new 
ByteArrayInputStream(bytes)));
+        }
+
+        private Integer readNextChunk() {
+            currentItemByteStream = Optional.empty();
+            return read();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                source.close();
+            } finally {
+                super.close();
+            }
+        }
+    }
 }
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 3c991e0..98bb9e1 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -20,10 +20,18 @@ package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 class ReactorUtilsTest {
 
@@ -69,4 +77,76 @@ class ReactorUtilsTest {
             }
         }
     }
-}
\ No newline at end of file
+
+    @Nested
+    class ToInputStream {
+        @Test
+        void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.range(0, 10)
+                .subscribeOn(Schedulers.elastic())
+                .limitRate(2)
+                .doOnRequest(request -> generateElements.getAndAdd((int) 
request))
+                .map(index -> new byte[] {(byte) (int) index});
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(6);
+        }
+
+        @Test
+        void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] 
{3, 4, 5}, new byte[] {6, 7, 8})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) 
request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(3);
+        }
+
+        @Test
+        void 
givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch()
 throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] 
{}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) 
request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(4);
+        }
+
+        @Test
+        void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws 
IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.<byte[]>empty()
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) 
request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 0, 0, 0, 0);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(1);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to