This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new c2dfd43ffa [REACTOR] Reactive content
c2dfd43ffa is described below
commit c2dfd43ffab1632407ba1d06f69ceab8cb254de1
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Oct 6 10:02:34 2022 +0700
[REACTOR] Reactive content
Co-authored-by: Quan Tran <[email protected]>
---
.../apache/james/mailbox/model/ByteContent.java | 11 +++++++
.../org/apache/james/mailbox/model/Content.java | 17 ++++++++++
.../org/apache/james/mailbox/model/Header.java | 11 +++++++
.../mailbox/model/HeaderAndBodyByteContent.java | 10 ++++++
.../james/mailbox/store/MessageResultImpl.java | 7 ++++
.../james/mailbox/store/MimeDescriptorImpl.java | 17 ++++++++++
.../james/mailbox/store/mail/model/Message.java | 37 ++++++++++++++++++++++
.../store/mail/model/impl/SimpleMessage.java | 22 +++++++++++++
.../mailbox/store/streaming/FullByteContent.java | 23 ++++++++++++--
.../store/streaming/InputStreamContent.java | 11 +++++++
.../james/imap/processor/fetch/EmptyContent.java | 9 ++++++
.../org/apache/james/rspamd/RspamdListener.java | 15 ++++-----
.../james/rspamd/client/RspamdHttpClient.java | 14 ++++----
.../james/rspamd/task/FeedHamToRspamdTask.java | 2 +-
.../james/rspamd/task/FeedSpamToRspamdTask.java | 2 +-
.../james/rspamd/client/RspamdHttpClientTest.java | 18 ++++++-----
.../james/rspamd/task/FeedHamToRspamdTaskTest.java | 7 ++--
.../rspamd/task/FeedSpamToRspamdTaskTest.java | 7 ++--
18 files changed, 207 insertions(+), 33 deletions(-)
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
index 21e004a111..a14a7daa8a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/ByteContent.java
@@ -24,6 +24,11 @@ package org.apache.james.mailbox.model;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
public final class ByteContent implements Content {
@@ -45,4 +50,10 @@ public final class ByteContent implements Content {
public InputStream getInputStream() {
return new ByteArrayInputStream(contents);
}
+
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return Flux.just(contents)
+ .map(ByteBuffer::wrap);
+ }
}
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
index 30328fb4db..abee852e0a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Content.java
@@ -21,14 +21,22 @@ package org.apache.james.mailbox.model;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
/**
* IMAP needs to know the size of the content before it starts to write it out.
* This interface allows direct writing whilst exposing total size.
*/
public interface Content {
+ int BUFFER_SIZE = 16384;
+
/**
* Return the content as {@link InputStream}
*/
@@ -40,4 +48,13 @@ public interface Content {
* @return number of octets to be written
*/
long size() throws MailboxException;
+
+ default Publisher<ByteBuffer> reactiveBytes() {
+ try {
+ return ReactorUtils.toChunks(getInputStream(), BUFFER_SIZE)
+ .subscribeOn(Schedulers.boundedElastic());
+ } catch (IOException e) {
+ return Flux.error(e);
+ }
+ }
}
\ No newline at end of file
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
index 71b2e4cf1f..c9dcc918a5 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Header.java
@@ -26,6 +26,11 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
/**
* A header.
@@ -72,4 +77,10 @@ public final class Header implements Content {
public InputStream getInputStream() {
return new ByteArrayInputStream((name + ": " +
value).getBytes(US_ASCII));
}
+
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return Flux.just((name + ": " + value).getBytes(US_ASCII))
+ .map(ByteBuffer::wrap);
+ }
}
\ No newline at end of file
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
index b929fc26f2..bbd1740d6e 100644
---
a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
+++
b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
@@ -22,6 +22,11 @@ package org.apache.james.mailbox.model;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
+
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
public final class HeaderAndBodyByteContent implements Content {
@@ -47,4 +52,9 @@ public final class HeaderAndBodyByteContent implements
Content {
new ByteArrayInputStream(headers),
new ByteArrayInputStream(body));
}
+
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return Flux.concat(Flux.just(headers).map(ByteBuffer::wrap),
Flux.just(body).map(ByteBuffer::wrap));
+ }
}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
index 168365b369..73866e4a20 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MessageResultImpl.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.store;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -47,6 +48,7 @@ import
org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.streaming.InputStreamContent;
import org.apache.james.mailbox.store.streaming.InputStreamContent.Type;
import org.apache.james.mime4j.MimeException;
+import org.reactivestreams.Publisher;
import com.google.common.base.Objects;
@@ -346,6 +348,11 @@ public class MessageResultImpl implements MessageResult {
return msg.getHeaderContent();
}
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return msg.getHeaderContentReactive();
+ }
+
@Override
public long size() {
return msg.getHeaderOctets();
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
index 609d2f0791..0199aaaa42 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/MimeDescriptorImpl.java
@@ -24,6 +24,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -44,6 +45,10 @@ import org.apache.james.mime4j.stream.EntityState;
import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.mime4j.stream.MimeTokenStream;
import org.apache.james.mime4j.stream.RecursionMode;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class MimeDescriptorImpl implements MimeDescriptor {
@@ -315,6 +320,18 @@ public class MimeDescriptorImpl implements MimeDescriptor {
return new ByteArrayInputStream(sb.toString().getBytes(US_ASCII));
}
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return Mono.fromCallable(() -> {
+ StringBuilder sb = new StringBuilder();
+ for (Header header : headers) {
+ sb.append(header.getName()).append(":
").append(header.getValue()).append("\r\n");
+ }
+ sb.append("\r\n");
+ return sb.toString().getBytes(US_ASCII);
+ }).flatMapMany(bytes -> Flux.just(bytes).map(ByteBuffer::wrap));
+ }
+
@Override
public long size() throws MailboxException {
long result = 0;
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
index fef05fbe3a..a5f37bb198 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/Message.java
@@ -18,14 +18,22 @@
****************************************************************/
package org.apache.james.mailbox.store.mail.model;
+import static org.apache.james.mailbox.model.Content.BUFFER_SIZE;
+
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import org.apache.james.mailbox.model.MessageAttachmentMetadata;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.model.impl.Properties;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
public interface Message {
@@ -42,6 +50,15 @@ public interface Message {
*/
InputStream getBodyContent() throws IOException;
+ default Publisher<ByteBuffer> getBodyContentReactive() {
+ try {
+ return ReactorUtils.toChunks(getBodyContent(), BUFFER_SIZE)
+ .subscribeOn(Schedulers.boundedElastic());
+ } catch (IOException e) {
+ return Flux.error(e);
+ }
+ }
+
/**
* Gets the top level MIME content media type.
*
@@ -86,6 +103,16 @@ public interface Message {
*/
InputStream getHeaderContent() throws IOException;
+
+ default Publisher<ByteBuffer> getHeaderContentReactive() {
+ try {
+ return ReactorUtils.toChunks(getHeaderContent(), BUFFER_SIZE)
+ .subscribeOn(Schedulers.boundedElastic());
+ } catch (IOException e) {
+ return Flux.error(e);
+ }
+ }
+
/**
* Returns the full raw content of the MailboxMessage via an {@link
InputStream}.
*
@@ -94,6 +121,16 @@ public interface Message {
*/
InputStream getFullContent() throws IOException;
+
+ default Publisher<ByteBuffer> getFullContentReactive() {
+ try {
+ return ReactorUtils.toChunks(getFullContent(), BUFFER_SIZE)
+ .subscribeOn(Schedulers.boundedElastic());
+ } catch (IOException e) {
+ return Flux.error(e);
+ }
+ }
+
/**
* Gets a read-only list of meta-data properties.
* For properties with multiple values, this list will contain
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
index 4ebf1ba0ce..add87a967d 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMessage.java
@@ -20,14 +20,19 @@ package org.apache.james.mailbox.store.mail.model.impl;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.Content;
import org.apache.james.mailbox.model.MessageAttachmentMetadata;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.model.Message;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
public class SimpleMessage implements Message {
@@ -121,4 +126,21 @@ public class SimpleMessage implements Message {
public List<MessageAttachmentMetadata> getAttachments() {
return attachments;
}
+
+ @Override
+ public Publisher<ByteBuffer> getHeaderContentReactive() {
+ try {
+ if (bodyStartOctet >= content.size()) {
+ return content.reactiveBytes();
+ }
+ } catch (MailboxException e) {
+ return Flux.error(e);
+ }
+ return Message.super.getHeaderContentReactive();
+ }
+
+ @Override
+ public Publisher<ByteBuffer> getFullContentReactive() {
+ return content.reactiveBytes();
+ }
}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
index 7d11a5388f..fcf077a452 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/FullByteContent.java
@@ -24,12 +24,17 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.james.mailbox.model.Content;
import org.apache.james.mailbox.model.Header;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* Abstract base class for {@link Content} implementations which hold the
headers and
@@ -82,7 +87,21 @@ public class FullByteContent implements Content {
public long size() {
return size;
}
-
-
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ Flux<ByteBuffer> headerContent = Mono.fromCallable(() -> {
+ StringBuilder sb = new StringBuilder();
+ for (Header header : headers) {
+
sb.append(header.getName()).append(NAME_DELIMITER).append(header.getValue()).append(END_OF_LINE);
+ }
+ sb.append(END_OF_LINE);
+ return sb.toString().getBytes(US_ASCII);
+ })
+ .flatMapMany(bytes -> Flux.just(bytes).map(ByteBuffer::wrap));
+
+ Flux<ByteBuffer> bodyContent = Flux.just(body).map(ByteBuffer::wrap);
+
+ return Flux.concat(headerContent, bodyContent);
+ }
}
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
index 1cf8e56ae3..5a2662095a 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/InputStreamContent.java
@@ -20,9 +20,11 @@ package org.apache.james.mailbox.store.streaming;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.apache.james.mailbox.model.Content;
import org.apache.james.mailbox.store.mail.model.Message;
+import org.reactivestreams.Publisher;
/**
* {@link Content} which is stored in a {@link InputStream}
@@ -65,4 +67,13 @@ public final class InputStreamContent implements Content {
}
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ switch (type) {
+ case FULL:
+ return m.getFullContentReactive();
+ default:
+ return m.getBodyContentReactive();
+ }
+ }
}
diff --git
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
index 35b09a2832..6707882b48 100644
---
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
+++
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/EmptyContent.java
@@ -21,8 +21,12 @@ package org.apache.james.imap.processor.fetch;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.apache.james.mailbox.model.Content;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
/**
* Just an Empty {@link Content}
@@ -44,4 +48,9 @@ public class EmptyContent implements Content {
return new ByteArrayInputStream(EMPTY_ARRAY);
}
+ @Override
+ public Publisher<ByteBuffer> reactiveBytes() {
+ return Flux.just(EMPTY_ARRAY)
+ .map(ByteBuffer::wrap);
+ }
}
diff --git
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
index 61a505d88a..9562488b72 100644
---
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
+++
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
@@ -20,7 +20,7 @@
package org.apache.james.rspamd;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
import javax.inject.Inject;
@@ -48,7 +48,6 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import reactor.core.publisher.Flux;
@@ -114,18 +113,18 @@ public class RspamdListener implements SpamEventListener,
EventListener.Reactive
.map(mailbox -> Pair.of(mailbox,
mapperFactory.getMessageMapper(session)))
.flatMapMany(pair ->
Flux.fromIterable(MessageRange.toRanges(addedEvent.getUids()))
.flatMap(range ->
pair.getRight().findInMailboxReactive(pair.getLeft(), range,
MessageMapper.FetchType.FULL, LIMIT)))
- .map(Throwing.function(MailboxMessage::getFullContent))
+ .map(MailboxMessage::getFullContentReactive)
.flatMap(rspamdHttpClient::reportAsHam,
ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
- private Flux<InputStream> mailboxMessagePublisher(MessageMoveEvent
messageMoveEvent) {
+ private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent
messageMoveEvent) {
return Mono.fromCallable(() ->
mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
.flatMapMany(messageIdMapper ->
messageIdMapper.findReactive(messageMoveEvent.getMessageIds(),
MessageMapper.FetchType.FULL))
- .map(Throwing.function(MailboxMessage::getFullContent));
+ .flatMap(MailboxMessage::getFullContentReactive);
}
- private Mono<Void> handleMessageMoved(Flux<InputStream>
mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
+ private Mono<Void> handleMessageMoved(Flux<ByteBuffer>
mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
Mono<Boolean> reportHamIfNotSpamDetected =
isMessageMovedOutOfSpamMailbox(messageMoveEvent)
.filter(FunctionalUtils.identityPredicate())
.doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId =
{}", messageMoveEvent.getEventId().getId()));
@@ -134,11 +133,11 @@ public class RspamdListener implements SpamEventListener,
EventListener.Reactive
.flatMap(isSpam -> {
if (isSpam) {
LOGGER.debug("Spam event detected, EventId = {}",
messageMoveEvent.getEventId().getId());
- return
mailboxMessagesPublisher.flatMap(rspamdHttpClient::reportAsSpam,
ReactorUtils.DEFAULT_CONCURRENCY)
+ return
rspamdHttpClient.reportAsSpam(mailboxMessagesPublisher)
.then();
} else {
return reportHamIfNotSpamDetected
- .flatMapMany(isHam ->
mailboxMessagesPublisher.flatMap(rspamdHttpClient::reportAsHam,
ReactorUtils.DEFAULT_CONCURRENCY))
+ .flatMapMany(isHam ->
rspamdHttpClient.reportAsHam(mailboxMessagesPublisher))
.then();
}
});
diff --git
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
index 4fb758edff..1499f5f79a 100644
---
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
+++
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/client/RspamdHttpClient.java
@@ -21,7 +21,7 @@ package org.apache.james.rspamd.client;
import static
org.apache.james.rspamd.client.RspamdClientConfiguration.DEFAULT_TIMEOUT_IN_SECONDS;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
@@ -36,6 +36,7 @@ import org.apache.james.server.core.MimeMessageInputStream;
import org.apache.james.util.ReactorUtils;
import org.apache.mailet.AttributeName;
import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import io.netty.buffer.Unpooled;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufMono;
import reactor.netty.http.client.HttpClient;
@@ -82,11 +84,11 @@ public class RspamdHttpClient {
.subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
}
- public Mono<Void> reportAsSpam(InputStream content) {
+ public Mono<Void> reportAsSpam(Publisher<ByteBuffer> content) {
return reportMail(content, LEARN_SPAM_ENDPOINT);
}
- public Mono<Void> reportAsHam(InputStream content) {
+ public Mono<Void> reportAsHam(Publisher<ByteBuffer> content) {
return reportMail(content, LEARN_HAM_ENDPOINT);
}
@@ -127,12 +129,10 @@ public class RspamdHttpClient {
.headers(headers -> headers.add("Password",
configuration.getPassword()));
}
- private Mono<Void> reportMail(InputStream content, String endpoint) {
+ private Mono<Void> reportMail(Publisher<ByteBuffer> content, String
endpoint) {
return httpClient.post()
.uri(endpoint)
- .send(ReactorUtils.toChunks(content, BUFFER_SIZE)
- .map(Unpooled::wrappedBuffer)
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
+ .send(Flux.from(content).map(Unpooled::wrappedBuffer))
.responseSingle(this::reportMailHttpResponseHandler);
}
diff --git
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
index 36fef077aa..d7f578b993 100644
---
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
+++
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedHamToRspamdTask.java
@@ -239,7 +239,7 @@ public class FeedHamToRspamdTask implements Task {
.transform(ReactorUtils.<MessageResult, Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(Duration.ofSeconds(1))
- .forOperation(messageResult ->
rspamdHttpClient.reportAsHam(Throwing.supplier(() ->
messageResult.getFullContent().getInputStream()).get())
+ .forOperation(messageResult ->
rspamdHttpClient.reportAsHam(Throwing.supplier(() ->
messageResult.getFullContent().reactiveBytes()).get())
.timeout(runningOptions.getRspamdTimeout())
.then(Mono.fromCallable(() -> {
context.incrementReportedHamMessageCount(1);
diff --git
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
index 8df41d4369..dcb6770674 100644
---
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
+++
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/FeedSpamToRspamdTask.java
@@ -240,7 +240,7 @@ public class FeedSpamToRspamdTask implements Task {
.transform(ReactorUtils.<MessageResult, Task.Result>throttle()
.elements(runningOptions.getMessagesPerSecond())
.per(Duration.ofSeconds(1))
- .forOperation(messageResult ->
rspamdHttpClient.reportAsSpam(Throwing.supplier(() ->
messageResult.getFullContent().getInputStream()).get())
+ .forOperation(messageResult ->
rspamdHttpClient.reportAsSpam(Throwing.supplier(() ->
messageResult.getFullContent().reactiveBytes()).get())
.timeout(runningOptions.getRspamdTimeout())
.then(Mono.fromCallable(() -> {
context.incrementReportedSpamMessageCount(1);
diff --git
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
index 3271e9b064..bbbbae88ec 100644
---
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
+++
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/client/RspamdHttpClientTest.java
@@ -19,6 +19,7 @@
package org.apache.james.rspamd.client;
+import static org.apache.james.mailbox.model.Content.BUFFER_SIZE;
import static org.apache.james.rspamd.DockerRspamd.PASSWORD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -37,6 +38,7 @@ import
org.apache.james.rspamd.exception.UnauthorizedException;
import org.apache.james.rspamd.model.AnalysisResult;
import org.apache.james.util.MimeMessageUtil;
import org.apache.james.util.Port;
+import org.apache.james.util.ReactorUtils;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.mailet.Mail;
import org.apache.mailet.base.test.FakeMail;
@@ -170,7 +172,7 @@ class RspamdHttpClientTest {
RspamdClientConfiguration configuration = new
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD,
Optional.empty());
RspamdHttpClient client = new RspamdHttpClient(configuration);
- assertThatCode(() ->
client.reportAsSpam(spamMessage.getMessage().getInputStream()).block())
+ assertThatCode(() ->
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block())
.doesNotThrowAnyException();
}
@@ -179,7 +181,7 @@ class RspamdHttpClientTest {
RspamdClientConfiguration configuration = new
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD,
Optional.empty());
RspamdHttpClient client = new RspamdHttpClient(configuration);
- assertThatCode(() ->
client.reportAsHam(hamMessage.getMessage().getInputStream()).block())
+ assertThatCode(() ->
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block())
.doesNotThrowAnyException();
}
@@ -188,8 +190,8 @@ class RspamdHttpClientTest {
RspamdClientConfiguration configuration = new
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD,
Optional.empty());
RspamdHttpClient client = new RspamdHttpClient(configuration);
- client.reportAsHam(hamMessage.getMessage().getInputStream()).block();
- assertThatCode(() ->
client.reportAsHam(hamMessage.getMessage().getInputStream()).block())
+
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block();
+ assertThatCode(() ->
client.reportAsHam(ReactorUtils.toChunks(hamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block())
.doesNotThrowAnyException();
}
@@ -198,8 +200,8 @@ class RspamdHttpClientTest {
RspamdClientConfiguration configuration = new
RspamdClientConfiguration(rspamdExtension.getBaseUrl(), PASSWORD,
Optional.empty());
RspamdHttpClient client = new RspamdHttpClient(configuration);
- client.reportAsSpam(spamMessage.getMessage().getInputStream()).block();
- assertThatCode(() ->
client.reportAsSpam(spamMessage.getMessage().getInputStream()).block())
+
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block();
+ assertThatCode(() ->
client.reportAsSpam(ReactorUtils.toChunks(spamMessage.getMessage().getInputStream(),
BUFFER_SIZE)).block())
.doesNotThrowAnyException();
}
@@ -222,11 +224,11 @@ class RspamdHttpClientTest {
}
private void reportAsSpam(RspamdHttpClient client, InputStream
inputStream) {
- client.reportAsSpam(inputStream).block();
+ client.reportAsSpam(ReactorUtils.toChunks(inputStream,
BUFFER_SIZE)).block();
}
private void reportAsHam(RspamdHttpClient client, InputStream inputStream)
{
- client.reportAsHam(inputStream).block();
+ client.reportAsHam(ReactorUtils.toChunks(inputStream,
BUFFER_SIZE)).block();
}
}
diff --git
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
index 87ddd12f16..b1c27db7f5 100644
---
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
+++
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
@@ -29,8 +29,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
@@ -71,6 +71,7 @@ import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Delay;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
+import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
@@ -103,8 +104,8 @@ public class FeedHamToRspamdTaskTest {
}
@Override
- public Mono<Void> reportAsHam(InputStream content) {
- return Mono.fromCallable(() -> content)
+ public Mono<Void> reportAsHam(Publisher<ByteBuffer> content) {
+ return Mono.from(content)
.doOnNext(e -> hitCounter.incrementAndGet())
.then();
}
diff --git
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
index a69f19ad0e..ce4879c045 100644
---
a/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
+++
b/third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedSpamToRspamdTaskTest.java
@@ -29,8 +29,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Instant;
import java.time.ZonedDateTime;
@@ -71,6 +71,7 @@ import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.Delay;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
+import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
@@ -102,8 +103,8 @@ public class FeedSpamToRspamdTaskTest {
}
@Override
- public Mono<Void> reportAsSpam(InputStream content) {
- return Mono.fromCallable(() -> content)
+ public Mono<Void> reportAsSpam(Publisher<ByteBuffer> content) {
+ return Mono.from(content)
.doOnNext(e -> hitCounter.incrementAndGet())
.then();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]