This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new c2dfd43ffa [REACTOR] Reactive content
c2dfd43ffa is described below

commit c2dfd43ffab1632407ba1d06f69ceab8cb254de1
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Oct 6 10:02:34 2022 +0700

    [REACTOR] Reactive content
    
    Co-authored-by: Quan Tran <[email protected]>
---
 .../apache/james/mailbox/model/ByteContent.java    | 11 +++++++
 .../org/apache/james/mailbox/model/Content.java    | 17 ++++++++++
 .../org/apache/james/mailbox/model/Header.java     | 11 +++++++
 .../mailbox/model/HeaderAndBodyByteContent.java    | 10 ++++++
 .../james/mailbox/store/MessageResultImpl.java     |  7 ++++
 .../james/mailbox/store/MimeDescriptorImpl.java    | 17 ++++++++++
 .../james/mailbox/store/mail/model/Message.java    | 37 ++++++++++++++++++++++
 .../store/mail/model/impl/SimpleMessage.java       | 22 +++++++++++++
 .../mailbox/store/streaming/FullByteContent.java   | 23 ++++++++++++--
 .../store/streaming/InputStreamContent.java        | 11 +++++++
 .../james/imap/processor/fetch/EmptyContent.java   |  9 ++++++
 .../org/apache/james/rspamd/RspamdListener.java    | 15 ++++-----
 .../james/rspamd/client/RspamdHttpClient.java      | 14 ++++----
 .../james/rspamd/task/FeedHamToRspamdTask.java     |  2 +-
 .../james/rspamd/task/FeedSpamToRspamdTask.java    |  2 +-
 .../james/rspamd/client/RspamdHttpClientTest.java  | 18 ++++++-----
 .../james/rspamd/task/FeedHamToRspamdTaskTest.java |  7 ++--
 .../rspamd/task/FeedSpamToRspamdTaskTest.java      |  7 ++--
 18 files changed, 207 insertions(+), 33 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
index 21e004a111..a14a7daa8a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
@@ -24,6 +24,11 @@ package org.apache.james.mailbox.model;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
 
 public final class ByteContent implements Content {
 
@@ -45,4 +50,10 @@ public final class ByteContent implements Content {
     public InputStream getInputStream() {
         return new ByteArrayInputStream(contents);
     }
+
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        return Flux.just(contents)
+            .map(ByteBuffer::wrap);
+    }
 }
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
index 30328fb4db..abee852e0a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
@@ -21,14 +21,22 @@ package org.apache.james.mailbox.model;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * IMAP needs to know the size of the content before it starts to write it out.
  * This interface allows direct writing whilst exposing total size.
  */
 public interface Content {
+    int BUFFER_SIZE = 16384;
+
     /**
      * Return the content as {@link InputStream}
      */
@@ -40,4 +48,13 @@ public interface Content {
      * @return number of octets to be written
      */
     long size() throws MailboxException;
+
+    default Publisher<ByteBuffer> reactiveBytes() {
+        try {
+            return ReactorUtils.toChunks(getInputStream(), BUFFER_SIZE)
+                .subscribeOn(Schedulers.boundedElastic());
+        } catch (IOException e) {
+            return Flux.error(e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
index 71b2e4cf1f..c9dcc918a5 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
@@ -26,6 +26,11 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
 
 /**
  * A header.
@@ -72,4 +77,10 @@ public final class Header implements Content {
     public InputStream getInputStream() {
         return new ByteArrayInputStream((name + ": " + 
value).getBytes(US_ASCII));
     }
+
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        return Flux.just((name + ": " + value).getBytes(US_ASCII))
+            .map(ByteBuffer::wrap);
+    }
 }
\ No newline at end of file
diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
index b929fc26f2..bbd1740d6e 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
@@ -22,6 +22,11 @@ package org.apache.james.mailbox.model;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
 
 public final class HeaderAndBodyByteContent implements Content {
 
@@ -47,4 +52,9 @@ public final class HeaderAndBodyByteContent implements 
Content {
             new ByteArrayInputStream(headers),
             new ByteArrayInputStream(body));
     }
+
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        return Flux.concat(Flux.just(headers).map(ByteBuffer::wrap), 
Flux.just(body).map(ByteBuffer::wrap));
+    }
 }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
index 168365b369..73866e4a20 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.store;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -47,6 +48,7 @@ import 
org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.streaming.InputStreamContent;
 import org.apache.james.mailbox.store.streaming.InputStreamContent.Type;
 import org.apache.james.mime4j.MimeException;
+import org.reactivestreams.Publisher;
 
 import com.google.common.base.Objects;
 
@@ -346,6 +348,11 @@ public class MessageResultImpl implements MessageResult {
             return msg.getHeaderContent();
         }
 
+        @Override
+        public Publisher<ByteBuffer> reactiveBytes() {
+            return msg.getHeaderContentReactive();
+        }
+
         @Override
         public long size() {
             return msg.getHeaderOctets();
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
index 609d2f0791..0199aaaa42 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
@@ -24,6 +24,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -44,6 +45,10 @@ import org.apache.james.mime4j.stream.EntityState;
 import org.apache.james.mime4j.stream.MimeConfig;
 import org.apache.james.mime4j.stream.MimeTokenStream;
 import org.apache.james.mime4j.stream.RecursionMode;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class MimeDescriptorImpl implements MimeDescriptor {
 
@@ -315,6 +320,18 @@ public class MimeDescriptorImpl implements MimeDescriptor {
         return new ByteArrayInputStream(sb.toString().getBytes(US_ASCII));
     }
 
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        return Mono.fromCallable(() -> {
+            StringBuilder sb = new StringBuilder();
+            for (Header header : headers) {
+                sb.append(header.getName()).append(": 
").append(header.getValue()).append("\r\n");
+            }
+            sb.append("\r\n");
+            return sb.toString().getBytes(US_ASCII);
+        }).flatMapMany(bytes -> Flux.just(bytes).map(ByteBuffer::wrap));
+    }
+
     @Override
     public long size() throws MailboxException {
         long result = 0;
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
index fef05fbe3a..a5f37bb198 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
@@ -18,14 +18,22 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail.model;
 
+import static org.apache.james.mailbox.model.Content.BUFFER_SIZE;
+
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Date;
 import java.util.List;
 
 import org.apache.james.mailbox.model.MessageAttachmentMetadata;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.model.impl.Properties;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
 
 public interface Message {
 
@@ -42,6 +50,15 @@ public interface Message {
      */
     InputStream getBodyContent() throws IOException;
 
+    default Publisher<ByteBuffer> getBodyContentReactive() {
+        try {
+            return ReactorUtils.toChunks(getBodyContent(), BUFFER_SIZE)
+                .subscribeOn(Schedulers.boundedElastic());
+        } catch (IOException e) {
+            return Flux.error(e);
+        }
+    }
+
     /**
      * Gets the top level MIME content media type.
      *
@@ -86,6 +103,16 @@ public interface Message {
      */
     InputStream getHeaderContent() throws IOException;
 
+
+    default Publisher<ByteBuffer> getHeaderContentReactive() {
+        try {
+            return ReactorUtils.toChunks(getHeaderContent(), BUFFER_SIZE)
+                .subscribeOn(Schedulers.boundedElastic());
+        } catch (IOException e) {
+            return Flux.error(e);
+        }
+    }
+
     /**
      * Returns the full raw content of the MailboxMessage via an {@link 
InputStream}.
      *
@@ -94,6 +121,16 @@ public interface Message {
      */
     InputStream getFullContent() throws IOException;
 
+
+    default Publisher<ByteBuffer> getFullContentReactive() {
+        try {
+            return ReactorUtils.toChunks(getFullContent(), BUFFER_SIZE)
+                .subscribeOn(Schedulers.boundedElastic());
+        } catch (IOException e) {
+            return Flux.error(e);
+        }
+    }
+
     /**
      * Gets a read-only list of meta-data properties.
      * For properties with multiple values, this list will contain
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
index 4ebf1ba0ce..add87a967d 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
@@ -20,14 +20,19 @@ package org.apache.james.mailbox.store.mail.model.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Date;
 import java.util.List;
 
 import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Content;
 import org.apache.james.mailbox.model.MessageAttachmentMetadata;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.model.Message;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
 
 public class SimpleMessage implements Message {
 
@@ -121,4 +126,21 @@ public class SimpleMessage implements Message {
     public List<MessageAttachmentMetadata> getAttachments() {
         return attachments;
     }
+
+    @Override
+    public Publisher<ByteBuffer> getHeaderContentReactive() {
+        try {
+            if (bodyStartOctet >= content.size()) {
+                return content.reactiveBytes();
+            }
+        } catch (MailboxException e) {
+            return Flux.error(e);
+        }
+        return Message.super.getHeaderContentReactive();
+    }
+
+    @Override
+    public Publisher<ByteBuffer> getFullContentReactive() {
+        return content.reactiveBytes();
+    }
 }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
index 7d11a5388f..fcf077a452 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
@@ -24,12 +24,17 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.apache.james.mailbox.model.Content;
 import org.apache.james.mailbox.model.Header;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Abstract base class for {@link Content} implementations which hold the 
headers and 
@@ -82,7 +87,21 @@ public class FullByteContent implements Content {
     public long size() {
         return size;
     }
-    
 
-    
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        Flux<ByteBuffer> headerContent = Mono.fromCallable(() -> {
+                StringBuilder sb = new StringBuilder();
+                for (Header header : headers) {
+                    
sb.append(header.getName()).append(NAME_DELIMITER).append(header.getValue()).append(END_OF_LINE);
+                }
+                sb.append(END_OF_LINE);
+                return sb.toString().getBytes(US_ASCII);
+            })
+            .flatMapMany(bytes -> Flux.just(bytes).map(ByteBuffer::wrap));
+
+        Flux<ByteBuffer> bodyContent = Flux.just(body).map(ByteBuffer::wrap);
+
+        return Flux.concat(headerContent, bodyContent);
+    }
 }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
index 1cf8e56ae3..5a2662095a 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
@@ -20,9 +20,11 @@ package org.apache.james.mailbox.store.streaming;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.james.mailbox.model.Content;
 import org.apache.james.mailbox.store.mail.model.Message;
+import org.reactivestreams.Publisher;
 
 /**
  * {@link Content} which is stored in a {@link InputStream}
@@ -65,4 +67,13 @@ public final class InputStreamContent implements Content {
        
     }
 
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        switch (type) {
+            case FULL:
+                return m.getFullContentReactive();
+            default:
+                return m.getBodyContentReactive();
+        }
+    }
 }
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
index 35b09a2832..6707882b48 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
@@ -21,8 +21,12 @@ package org.apache.james.imap.processor.fetch;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.james.mailbox.model.Content;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
 
 /**
  * Just an Empty {@link Content}
@@ -44,4 +48,9 @@ public class EmptyContent implements Content {
         return new ByteArrayInputStream(EMPTY_ARRAY);
     }
 
+    @Override
+    public Publisher<ByteBuffer> reactiveBytes() {
+        return Flux.just(EMPTY_ARRAY)
+            .map(ByteBuffer::wrap);
+    }
 }
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
index 61a505d88a..9562488b72 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
@@ -20,7 +20,7 @@
 package org.apache.james.rspamd;
 
 
-import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import javax.inject.Inject;
 
@@ -48,7 +48,6 @@ import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Flux;
@@ -114,18 +113,18 @@ public class RspamdListener implements SpamEventListener, 
EventListener.Reactive
             .map(mailbox -> Pair.of(mailbox, 
mapperFactory.getMessageMapper(session)))
             .flatMapMany(pair -> 
Flux.fromIterable(MessageRange.toRanges(addedEvent.getUids()))
                 .flatMap(range -> 
pair.getRight().findInMailboxReactive(pair.getLeft(), range, 
MessageMapper.FetchType.FULL, LIMIT)))
-            .map(Throwing.function(MailboxMessage::getFullContent))
+            .map(MailboxMessage::getFullContentReactive)
             .flatMap(rspamdHttpClient::reportAsHam, 
ReactorUtils.DEFAULT_CONCURRENCY)
             .then();
     }
 
-    private Flux<InputStream> mailboxMessagePublisher(MessageMoveEvent 
messageMoveEvent) {
+    private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent 
messageMoveEvent) {
         return Mono.fromCallable(() -> 
mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
             .flatMapMany(messageIdMapper -> 
messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), 
MessageMapper.FetchType.FULL))
-            .map(Throwing.function(MailboxMessage::getFullContent));
+            .flatMap(MailboxMessage::getFullContentReactive);
     }
 
-    private Mono<Void> handleMessageMoved(Flux<InputStream> 
mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
+    private Mono<Void> handleMessageMoved(Flux<ByteBuffer> 
mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
         Mono<Boolean> reportHamIfNotSpamDetected = 
isMessageMovedOutOfSpamMailbox(messageMoveEvent)
             .filter(FunctionalUtils.identityPredicate())
             .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = 
{}", messageMoveEvent.getEventId().getId()));
@@ -134,11 +133,11 @@ public class RspamdListener implements SpamEventListener, 
EventListener.Reactive
             .flatMap(isSpam -> {
                 if (isSpam) {
                     LOGGER.debug("Spam event detected, EventId = {}", 
messageMoveEvent.getEventId().getId());
-                    return 
mailboxMessagesPublisher.flatMap(rspamdHttpClient::reportAsSpam, 
ReactorUtils.DEFAULT_CONCURRENCY)
+                    return 
rspamdHttpClient.reportAsSpam(mailboxMessagesPublisher)
                         .then();
                 } else {
                     return reportHamIfNotSpamDetected
-                        .flatMapMany(isHam -> 
mailboxMessagesPublisher.flatMap(rspamdHttpClient::reportAsHam, 
ReactorUtils.DEFAULT_CONCURRENCY))
+                        .flatMapMany(isHam -> 
rspamdHttpClient.reportAsHam(mailboxMessagesPublisher))
                         .then();
                 }
             });
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
index 4fb758edff..1499f5f79a 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
@@ -21,7 +21,7 @@ package org.apache.james.rspamd.client;
 
 import static 
org.apache.james.rspamd.client.RspamdClientConfiguration.DEFAULT_TIMEOUT_IN_SECONDS;
 
-import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Optional;
@@ -36,6 +36,7 @@ import org.apache.james.server.core.MimeMessageInputStream;
 import org.apache.james.util.ReactorUtils;
 import org.apache.mailet.AttributeName;
 import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +47,7 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
 import io.netty.buffer.Unpooled;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.ByteBufMono;
 import reactor.netty.http.client.HttpClient;
@@ -82,11 +84,11 @@ public class RspamdHttpClient {
             .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
     }
 
-    public Mono<Void> reportAsSpam(InputStream content) {
+    public Mono<Void> reportAsSpam(Publisher<ByteBuffer> content) {
         return reportMail(content, LEARN_SPAM_ENDPOINT);
     }
 
-    public Mono<Void> reportAsHam(InputStream content) {
+    public Mono<Void> reportAsHam(Publisher<ByteBuffer> content) {
         return reportMail(content, LEARN_HAM_ENDPOINT);
     }
 
@@ -127,12 +129,10 @@ public class RspamdHttpClient {
             .headers(headers -> headers.add("Password", 
configuration.getPassword()));
     }
 
-    private Mono<Void> reportMail(InputStream content, String endpoint) {
+    private Mono<Void> reportMail(Publisher<ByteBuffer> content, String 
endpoint) {
         return httpClient.post()
             .uri(endpoint)
-            .send(ReactorUtils.toChunks(content, BUFFER_SIZE)
-                .map(Unpooled::wrappedBuffer)
-                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
+            .send(Flux.from(content).map(Unpooled::wrappedBuffer))
             .responseSingle(this::reportMailHttpResponseHandler);
     }
 
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
index 36fef077aa..d7f578b993 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
@@ -239,7 +239,7 @@ public class FeedHamToRspamdTask implements Task {
             .transform(ReactorUtils.<MessageResult, Result>throttle()
                 .elements(runningOptions.getMessagesPerSecond())
                 .per(Duration.ofSeconds(1))
-                .forOperation(messageResult -> 
rspamdHttpClient.reportAsHam(Throwing.supplier(() -> 
messageResult.getFullContent().getInputStream()).get())
+                .forOperation(messageResult -> 
rspamdHttpClient.reportAsHam(Throwing.supplier(() -> 
messageResult.getFullContent().reactiveBytes()).get())
                     .timeout(runningOptions.getRspamdTimeout())
                     .then(Mono.fromCallable(() -> {
                         context.incrementReportedHamMessageCount(1);
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
index 8df41d4369..dcb6770674 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
@@ -240,7 +240,7 @@ public class FeedSpamToRspamdTask implements Task {
             .transform(ReactorUtils.<MessageResult, Task.Result>throttle()
                 .elements(runningOptions.getMessagesPerSecond())
                 .per(Duration.ofSeconds(1))
-                .forOperation(messageResult -> 
rspamdHttpClient.reportAsSpam(Throwing.supplier(() -> 
messageResult.getFullContent().getInputStream()).get())
+                .forOperation(messageResult -> 
rspamdHttpClient.reportAsSpam(Throwing.supplier(() -> 
messageResult.getFullContent().reactiveBytes()).get())
                     .timeout(runningOptions.getRspamdTimeout())
                     .then(Mono.fromCallable(() -> {
                         context.incrementReportedSpamMessageCount(1);
diff --git 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
index 3271e9b064..bbbbae88ec 100644
--- 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
+++ 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.rspamd.client;
 
+import static org.apache.james.mailbox.model.Content.BUFFER_SIZE;
 import static org.apache.james.rspamd.DockerRspamd.PASSWORD;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -37,6 +38,7 @@ import 
org.apache.james.rspamd.exception.UnauthorizedException;
 import org.apache.james.rspamd.model.AnalysisResult;
 import org.apache.james.util.MimeMessageUtil;
 import org.apache.james.util.Port;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.test.FakeMail;
@@ -170,7 +172,7 @@ class RspamdHttpClientTest {
         RspamdClientConfiguration configuration = new 
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD, 
Optional.empty());
         RspamdHttpClient client = new RspamdHttpClient(configuration);
 
-        assertThatCode(() -> 
client.reportAsSpam(spamMessage.getMessage().getInputStream()).block())
+        assertThatCode(() -> 
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block())
             .doesNotThrowAnyException();
     }
 
@@ -179,7 +181,7 @@ class RspamdHttpClientTest {
         RspamdClientConfiguration configuration = new 
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD, 
Optional.empty());
         RspamdHttpClient client = new RspamdHttpClient(configuration);
 
-        assertThatCode(() -> 
client.reportAsHam(hamMessage.getMessage().getInputStream()).block())
+        assertThatCode(() -> 
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block())
             .doesNotThrowAnyException();
     }
 
@@ -188,8 +190,8 @@ class RspamdHttpClientTest {
         RspamdClientConfiguration configuration = new 
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD, 
Optional.empty());
         RspamdHttpClient client = new RspamdHttpClient(configuration);
 
-        client.reportAsHam(hamMessage.getMessage().getInputStream()).block();
-        assertThatCode(() -> 
client.reportAsHam(hamMessage.getMessage().getInputStream()).block())
+        
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block();
+        assertThatCode(() -> 
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block())
             .doesNotThrowAnyException();
     }
 
@@ -198,8 +200,8 @@ class RspamdHttpClientTest {
         RspamdClientConfiguration configuration = new 
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD, 
Optional.empty());
         RspamdHttpClient client = new RspamdHttpClient(configuration);
 
-        client.reportAsSpam(spamMessage.getMessage().getInputStream()).block();
-        assertThatCode(() -> 
client.reportAsSpam(spamMessage.getMessage().getInputStream()).block())
+        
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block();
+        assertThatCode(() -> 
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
 BUFFER_SIZE)).block())
             .doesNotThrowAnyException();
     }
 
@@ -222,11 +224,11 @@ class RspamdHttpClientTest {
     }
 
     private void reportAsSpam(RspamdHttpClient client, InputStream 
inputStream) {
-        client.reportAsSpam(inputStream).block();
+        client.reportAsSpam(ReactorUtils.toChunks(inputStream, 
BUFFER_SIZE)).block();
     }
 
     private void reportAsHam(RspamdHttpClient client, InputStream inputStream) 
{
-        client.reportAsHam(inputStream).block();
+        client.reportAsHam(ReactorUtils.toChunks(inputStream, 
BUFFER_SIZE)).block();
     }
 
 }
diff --git 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
index 87ddd12f16..b1c27db7f5 100644
--- 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
+++ 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
@@ -29,8 +29,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZonedDateTime;
@@ -71,6 +71,7 @@ import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.Delay;
 import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpResponse;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 
@@ -103,8 +104,8 @@ public class FeedHamToRspamdTaskTest {
         }
 
         @Override
-        public Mono<Void> reportAsHam(InputStream content) {
-            return Mono.fromCallable(() -> content)
+        public Mono<Void> reportAsHam(Publisher<ByteBuffer> content) {
+            return Mono.from(content)
                 .doOnNext(e -> hitCounter.incrementAndGet())
                 .then();
         }
diff --git 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
index a69f19ad0e..ce4879c045 100644
--- 
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
+++ 
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
@@ -29,8 +29,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.time.Instant;
 import java.time.ZonedDateTime;
@@ -71,6 +71,7 @@ import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.Delay;
 import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpResponse;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 
@@ -102,8 +103,8 @@ public class FeedSpamToRspamdTaskTest {
         }
 
         @Override
-        public Mono<Void> reportAsSpam(InputStream content) {
-            return Mono.fromCallable(() -> content)
+        public Mono<Void> reportAsSpam(Publisher<ByteBuffer> content) {
+            return Mono.from(content)
                 .doOnNext(e -> hitCounter.incrementAndGet())
                 .then();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to