This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 643cd023a6bbe57094fb03ed55a76f44f1d050bc
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Mar 2 14:31:25 2022 +0700

    JAMES-3719 Reactive textual content extraction with Apache Tika
    
    Tika was called from reactive code and was doing blocking HTTP calls from 
within
    the MIME parsing code.
    
    This generate:
     - An unneeded thread consumption as we have some threads waiting for Tika
       response
     - Potentially dangerous blocking calls: for instance the InVM event bus was
      doing such calls on the parallel thread pool (where it is critical NOT to
      block)...
     - Also the connection was opened on a per-call basis, not being reused.
    
     We introduce the following changes:
      - Reactification of the TextExtractor API
      - We re-implement the HTTP calls done by TikaTextExtractor with 
reactor-netty
      which allows us to pool HTTP connections and do this in a non-blocking
      reactive fashion.
      - We provide a reactive cache using the caffeine caching library - Guava
      caches are blocking thus not an option...
      - We uncouple the text extraction from the MIME parsing phase by 
introducing
      an intermediate POJO. Doing so requires us to do a post-parsing copy of
      content.
    
     TODO only do the copy if necessary. We don't want to copy large 
attachments for whom no text is going to be extracted...
    
      - Finally we reactify index content generation for ElasticSearch code.
---
 .../james/mailbox/extractor/TextExtractor.java     |   6 +
 .../ElasticSearchListeningMessageSearchIndex.java  |  25 ++--
 .../elasticsearch/v7/json/IndexableMessage.java    | 138 +++++++++++----------
 .../v7/json/MessageToElasticSearchJson.java        |  33 ++---
 .../mailbox/elasticsearch/v7/json/MimePart.java    |  82 ++++++++----
 .../v7/json/MimePartContainerBuilder.java          |   4 +-
 .../elasticsearch/v7/json/MimePartParser.java      |   6 +-
 .../v7/json/RootMimePartContainerBuilder.java      |   6 +-
 .../v7/json/IndexableMessageTest.java              |  40 +++---
 .../v7/json/MessageToElasticSearchJsonTest.java    |  28 ++---
 .../elasticsearch/v7/json/MimePartTest.java        |   5 +-
 mailbox/tika/pom.xml                               |   8 ++
 .../james/mailbox/tika/CachingTextExtractor.java   |  76 ++++++------
 .../tika/ContentTypeFilteringTextExtractor.java    |  10 ++
 .../apache/james/mailbox/tika/TikaHttpClient.java  |   5 +-
 .../james/mailbox/tika/TikaHttpClientImpl.java     |  57 ++++++---
 .../james/mailbox/tika/TikaTextExtractor.java      |  28 +++--
 .../mailbox/tika/CachingTextExtractorTest.java     |  80 +++---------
 .../james/mailbox/tika/TikaTextExtractorTest.java  |   9 +-
 pom.xml                                            |   5 +
 20 files changed, 363 insertions(+), 288 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
index 6b911225b6..2fdf7e555e 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
@@ -23,8 +23,14 @@ import java.io.InputStream;
 
 import org.apache.james.mailbox.model.ContentType;
 
+import reactor.core.publisher.Mono;
+
 public interface TextExtractor {
 
     ParsedContent extractContent(InputStream inputStream, ContentType 
contentType) throws Exception;
 
+    default Mono<ParsedContent> extractContentReactive(InputStream 
inputStream, ContentType contentType) {
+        return Mono.fromCallable(() -> extractContent(inputStream, 
contentType));
+    }
+
 }
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
index 591a5ddcb5..29ef16119b 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
@@ -162,23 +162,22 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
         RoutingKey from = routingKeyFactory.from(mailbox.getMailboxId());
         DocumentId id = indexIdFor(mailbox.getMailboxId(), message.getUid());
 
-        return Mono.fromCallable(() -> generateIndexedJson(mailbox, message, 
session))
+        return generateIndexedJson(mailbox, message, session)
             .flatMap(jsonContent -> elasticSearchIndexer.index(id, 
jsonContent, from))
             .then();
     }
 
-    private String generateIndexedJson(Mailbox mailbox, MailboxMessage 
message, MailboxSession session) throws JsonProcessingException {
-        try {
-            return messageToElasticSearchJson.convertToJson(message);
-        } catch (Exception e) {
-            LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} 
without attachments ",
-                mailbox.getName(),
-                mailbox.getMailboxId().serialize(),
-                session.getUser().asString(),
-                message.getUid(),
-                e);
-            return 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
-        }
+    private Mono<String> generateIndexedJson(Mailbox mailbox, MailboxMessage 
message, MailboxSession session) {
+        return messageToElasticSearchJson.convertToJson(message)
+            .onErrorResume(e -> {
+                LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} 
without attachments ",
+                    mailbox.getName(),
+                    mailbox.getMailboxId().serialize(),
+                    session.getUser().asString(),
+                    message.getUid(),
+                    e);
+                return 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
+            });
     }
 
     @Override
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
index 072513fbaa..d224b43d56 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
@@ -39,6 +39,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public class IndexableMessage {
 
     private static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
@@ -62,7 +64,7 @@ public class IndexableMessage {
         private Builder() {
         }
 
-        public IndexableMessage build() {
+        public Mono<IndexableMessage> build() {
             Preconditions.checkNotNull(message.getMailboxId());
             Preconditions.checkNotNull(textExtractor);
             Preconditions.checkNotNull(indexAttachments);
@@ -95,73 +97,77 @@ public class IndexableMessage {
             return this;
         }
 
-        private IndexableMessage instantiateIndexedMessage() throws 
IOException, MimeException {
+        private Mono<IndexableMessage> instantiateIndexedMessage() throws 
IOException, MimeException {
             String messageId = 
SearchUtil.getSerializedMessageIdIfSupportedByUnderlyingStorageOrNull(message);
             String threadId = 
SearchUtil.getSerializedThreadIdIfSupportedByUnderlyingStorageOrNull(message);
-            MimePart parsingResult = new MimePartParser(message, 
textExtractor).parse();
-
-            Optional<String> bodyText = parsingResult.locateFirstTextBody();
-            Optional<String> bodyHtml = parsingResult.locateFirstHtmlBody();
-
-            boolean hasAttachment = 
MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments());
-            List<MimePart> attachments = 
setFlattenedAttachments(parsingResult, indexAttachments);
-
-            HeaderCollection headerCollection = 
parsingResult.getHeaderCollection();
-            ZonedDateTime internalDate = getSanitizedInternalDate(message, 
zoneId);
-
-            List<HeaderCollection.Header> headers = 
headerCollection.getHeaders();
-            Subjects subjects = 
Subjects.from(headerCollection.getSubjectSet());
-            EMailers from = 
EMailers.from(headerCollection.getFromAddressSet());
-            EMailers to = EMailers.from(headerCollection.getToAddressSet());
-            EMailers cc = EMailers.from(headerCollection.getCcAddressSet());
-            EMailers bcc = EMailers.from(headerCollection.getBccAddressSet());
-            String sentDate = 
DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate));
-            Optional<String> mimeMessageID = headerCollection.getMessageID();
-
-            long uid = message.getUid().asLong();
-            String mailboxId = message.getMailboxId().serialize();
-            ModSeq modSeq = message.getModSeq();
-            long size = message.getFullContentOctets();
-            String date = 
DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId));
-            String mediaType = message.getMediaType();
-            String subType = message.getSubType();
-            boolean isAnswered = message.isAnswered();
-            boolean isDeleted = message.isDeleted();
-            boolean isDraft = message.isDraft();
-            boolean isFlagged = message.isFlagged();
-            boolean isRecent = message.isRecent();
-            boolean isUnRead = !message.isSeen();
-            String[] userFlags = message.createFlags().getUserFlags();
-
-            return new IndexableMessage(
-                    attachments,
-                    bcc,
-                    bodyHtml,
-                    bodyText,
-                    cc,
-                    date,
-                    from,
-                    hasAttachment,
-                    headers,
-                    isAnswered,
-                    isDeleted,
-                    isDraft,
-                    isFlagged,
-                    isRecent,
-                    isUnRead,
-                    mailboxId,
-                    mediaType,
-                    messageId,
-                    threadId,
-                    modSeq,
-                    sentDate,
-                    size,
-                    subjects,
-                    subType,
-                    to,
-                    uid,
-                    userFlags,
-                    mimeMessageID);
+
+            return new MimePartParser(message, textExtractor).parse()
+                .asMimePart(textExtractor)
+                .map(parsingResult -> {
+
+                    Optional<String> bodyText = 
parsingResult.locateFirstTextBody();
+                    Optional<String> bodyHtml = 
parsingResult.locateFirstHtmlBody();
+
+                    boolean hasAttachment = 
MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments());
+                    List<MimePart> attachments = 
setFlattenedAttachments(parsingResult, indexAttachments);
+
+                    HeaderCollection headerCollection = 
parsingResult.getHeaderCollection();
+                    ZonedDateTime internalDate = 
getSanitizedInternalDate(message, zoneId);
+
+                    List<HeaderCollection.Header> headers = 
headerCollection.getHeaders();
+                    Subjects subjects = 
Subjects.from(headerCollection.getSubjectSet());
+                    EMailers from = 
EMailers.from(headerCollection.getFromAddressSet());
+                    EMailers to = 
EMailers.from(headerCollection.getToAddressSet());
+                    EMailers cc = 
EMailers.from(headerCollection.getCcAddressSet());
+                    EMailers bcc = 
EMailers.from(headerCollection.getBccAddressSet());
+                    String sentDate = 
DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate));
+                    Optional<String> mimeMessageID = 
headerCollection.getMessageID();
+
+                    long uid = message.getUid().asLong();
+                    String mailboxId = message.getMailboxId().serialize();
+                    ModSeq modSeq = message.getModSeq();
+                    long size = message.getFullContentOctets();
+                    String date = 
DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId));
+                    String mediaType = message.getMediaType();
+                    String subType = message.getSubType();
+                    boolean isAnswered = message.isAnswered();
+                    boolean isDeleted = message.isDeleted();
+                    boolean isDraft = message.isDraft();
+                    boolean isFlagged = message.isFlagged();
+                    boolean isRecent = message.isRecent();
+                    boolean isUnRead = !message.isSeen();
+                    String[] userFlags = message.createFlags().getUserFlags();
+
+                    return new IndexableMessage(
+                        attachments,
+                        bcc,
+                        bodyHtml,
+                        bodyText,
+                        cc,
+                        date,
+                        from,
+                        hasAttachment,
+                        headers,
+                        isAnswered,
+                        isDeleted,
+                        isDraft,
+                        isFlagged,
+                        isRecent,
+                        isUnRead,
+                        mailboxId,
+                        mediaType,
+                        messageId,
+                        threadId,
+                        modSeq,
+                        sentDate,
+                        size,
+                        subjects,
+                        subType,
+                        to,
+                        uid,
+                        userFlags,
+                        mimeMessageID);
+                });
         }
 
         private List<MimePart> setFlattenedAttachments(MimePart parsingResult, 
IndexAttachments indexAttachments) {
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
index 6a8168c34c..564d750083 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
@@ -33,8 +33,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.guava.GuavaModule;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class MessageToElasticSearchJson {
 
     private final ObjectMapper mapper;
@@ -56,24 +59,26 @@ public class MessageToElasticSearchJson {
         this(textExtractor, ZoneId.systemDefault(), indexAttachments);
     }
 
-    public String convertToJson(MailboxMessage message) throws 
JsonProcessingException {
+    public Mono<String> convertToJson(MailboxMessage message) {
         Preconditions.checkNotNull(message);
 
-        return mapper.writeValueAsString(IndexableMessage.builder()
-                .message(message)
-                .extractor(textExtractor)
-                .zoneId(zoneId)
-                .indexAttachments(indexAttachments)
-                .build());
+        return IndexableMessage.builder()
+            .message(message)
+            .extractor(textExtractor)
+            .zoneId(zoneId)
+            .indexAttachments(indexAttachments)
+            .build()
+            .map(Throwing.function(mapper::writeValueAsString));
     }
 
-    public String convertToJsonWithoutAttachment(MailboxMessage message) 
throws JsonProcessingException {
-        return mapper.writeValueAsString(IndexableMessage.builder()
-                .message(message)
-                .extractor(textExtractor)
-                .zoneId(zoneId)
-                .indexAttachments(IndexAttachments.NO)
-                .build());
+    public Mono<String> convertToJsonWithoutAttachment(MailboxMessage message) 
{
+        return IndexableMessage.builder()
+            .message(message)
+            .extractor(textExtractor)
+            .zoneId(zoneId)
+            .indexAttachments(IndexAttachments.NO)
+            .build()
+            .map(Throwing.function(mapper::writeValueAsString));
     }
 
     public String getUpdatedJsonMessagePart(Flags flags, ModSeq modSeq) throws 
JsonProcessingException {
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
index 722bd75756..918306fddf 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.elasticsearch.v7.json;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -40,17 +41,21 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class MimePart {
 
     public static class Builder implements MimePartContainerBuilder {
 
         private final HeaderCollection.Builder headerCollectionBuilder;
         private Optional<InputStream> bodyContent;
-        private final List<MimePart> children;
+        private final List<ParsedMimePart> children;
         private Optional<MediaType> mediaType;
         private Optional<SubType> subType;
         private Optional<String> fileName;
@@ -85,7 +90,7 @@ public class MimePart {
         }
 
         @Override
-        public Builder addChild(MimePart mimePart) {
+        public Builder addChild(ParsedMimePart mimePart) {
             children.add(mimePart);
             return this;
         }
@@ -129,11 +134,11 @@ public class MimePart {
         }
 
         @Override
-        public MimePart build() {
-            Optional<ParsedContent> parsedContent = 
parseContent(textExtractor);
-            return new MimePart(
+        public ParsedMimePart build() {
+            return new ParsedMimePart(
                 headerCollectionBuilder.build(),
-                parsedContent.flatMap(ParsedContent::getTextualContent),
+                bodyContent,
+                charset,
                 mediaType,
                 subType,
                 fileName,
@@ -141,27 +146,61 @@ public class MimePart {
                 contentDisposition,
                 children);
         }
+    }
 
-        private Optional<ParsedContent> parseContent(TextExtractor 
textExtractor) {
-            if (bodyContent.isPresent()) {
-                try {
-                    return Optional.of(extractText(textExtractor, 
bodyContent.get()));
-                } catch (Exception e) {
-                    LOGGER.warn("Failed parsing attachment", e);
-                }
-            }
-            return Optional.empty();
+    public static class ParsedMimePart {
+        private final HeaderCollection headerCollection;
+        private final Optional<byte[]> bodyContent;
+        private final Optional<Charset> charset;
+        private final Optional<MediaType> mediaType;
+        private final Optional<SubType> subType;
+        private final Optional<String> fileName;
+        private final Optional<String> fileExtension;
+        private final Optional<String> contentDisposition;
+        private final List<ParsedMimePart> attachments;
+
+        public ParsedMimePart(HeaderCollection headerCollection, 
Optional<InputStream> bodyContent, Optional<Charset> charset,
+                              Optional<MediaType> mediaType,
+                              Optional<SubType> subType, Optional<String> 
fileName, Optional<String> fileExtension,
+                              Optional<String> contentDisposition, 
List<ParsedMimePart> attachments) {
+            this.headerCollection = headerCollection;
+            this.bodyContent = 
bodyContent.map(Throwing.function(IOUtils::toByteArray));
+            this.mediaType = mediaType;
+            this.subType = subType;
+            this.fileName = fileName;
+            this.fileExtension = fileExtension;
+            this.contentDisposition = contentDisposition;
+            this.attachments = attachments;
+            this.charset = charset;
         }
 
-        private ParsedContent extractText(TextExtractor textExtractor, 
InputStream bodyContent) throws Exception {
+        public Mono<MimePart> asMimePart(TextExtractor textExtractor) {
+            return Flux.fromIterable(attachments)
+                .concatMap(attachment -> attachment.asMimePart(textExtractor))
+                .collectList()
+                .flatMap(attachments -> extractText(textExtractor)
+                    .map(Optional::ofNullable)
+                    .switchIfEmpty(Mono.just(Optional.empty()))
+                    .onErrorResume(e -> {
+                        LOGGER.warn("Failure extracting text message for some 
attachments", e);
+                        return Mono.just(Optional.empty());
+                    })
+                    .map(text -> new MimePart(headerCollection, 
text.flatMap(ParsedContent::getTextualContent),
+                        mediaType, subType, fileName, fileExtension, 
contentDisposition, attachments)));
+        }
+
+        private Mono<ParsedContent> extractText(TextExtractor textExtractor) {
+            if (bodyContent.isEmpty()) {
+                return Mono.empty();
+            }
             if (shouldPerformTextExtraction()) {
-                return textExtractor.extractContent(
-                    bodyContent,
+                return textExtractor.extractContentReactive(
+                    new ByteArrayInputStream(bodyContent.get()),
                     computeContentType().orElse(null));
             }
-            return new ParsedContent(
-                Optional.ofNullable(IOUtils.toString(bodyContent, 
charset.orElse(StandardCharsets.UTF_8))),
-                ImmutableMap.of());
+            return Mono.fromCallable(() -> new ParsedContent(
+                Optional.ofNullable(IOUtils.toString(new 
ByteArrayInputStream(bodyContent.get()), 
charset.orElse(StandardCharsets.UTF_8))),
+                ImmutableMap.of()));
         }
 
         private boolean shouldPerformTextExtraction() {
@@ -185,7 +224,6 @@ public class MimePart {
                 return Optional.empty();
             }
         }
-
     }
     
     public static Builder builder() {
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
index b5083f5377..d3d88c4f67 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
@@ -29,7 +29,7 @@ import org.apache.james.mime4j.stream.Field;
 
 public interface MimePartContainerBuilder {
 
-    MimePart build();
+    MimePart.ParsedMimePart build();
 
     MimePartContainerBuilder using(TextExtractor textExtractor);
 
@@ -37,7 +37,7 @@ public interface MimePartContainerBuilder {
 
     MimePartContainerBuilder addBodyContent(InputStream bodyContent);
 
-    MimePartContainerBuilder addChild(MimePart mimePart);
+    MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart);
 
     MimePartContainerBuilder addFileName(String fileName);
 
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
index f5091f3201..81a1a15e8a 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
@@ -50,7 +50,7 @@ public class MimePartParser {
     private final TextExtractor textExtractor;
     private final MimeTokenStream stream;
     private final Deque<MimePartContainerBuilder> builderStack;
-    private MimePart result;
+    private MimePart.ParsedMimePart result;
     private MimePartContainerBuilder currentlyBuildMimePart;
 
     public MimePartParser(Message message, TextExtractor textExtractor) {
@@ -63,7 +63,7 @@ public class MimePartParser {
             new DefaultBodyDescriptorBuilder(null, FIELD_PARSER, 
DecodeMonitor.SILENT));
     }
 
-    public MimePart parse() throws IOException, MimeException {
+    public MimePart.ParsedMimePart parse() throws IOException, MimeException {
         stream.parse(message.getFullContent());
         for (EntityState state = stream.getState(); state != 
EntityState.T_END_OF_STREAM; state = stream.next()) {
             processMimePart(stream, state);
@@ -107,7 +107,7 @@ public class MimePartParser {
     }
     
     private void closeMimePart() {
-        MimePart bodyMimePart = 
currentlyBuildMimePart.using(textExtractor).build();
+        MimePart.ParsedMimePart bodyMimePart = 
currentlyBuildMimePart.using(textExtractor).build();
         if (!builderStack.isEmpty()) {
             builderStack.peek().addChild(bodyMimePart);
         } else {
diff --git 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
index 48fadf28f5..93755a6790 100644
--- 
a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
+++ 
b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
@@ -33,10 +33,10 @@ public class RootMimePartContainerBuilder implements 
MimePartContainerBuilder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RootMimePartContainerBuilder.class);
 
-    private MimePart rootMimePart;
+    private MimePart.ParsedMimePart rootMimePart;
 
     @Override
-    public MimePart build() {
+    public MimePart.ParsedMimePart build() {
         return rootMimePart;
     }
 
@@ -57,7 +57,7 @@ public class RootMimePartContainerBuilder implements 
MimePartContainerBuilder {
     }
 
     @Override
-    public MimePartContainerBuilder addChild(MimePart mimePart) {
+    public MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart) 
{
         if (rootMimePart == null) {
             rootMimePart = mimePart;
         } else {
diff --git 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
index d36b9691c7..62e4d85871 100644
--- 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
+++ 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
@@ -56,6 +56,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+
 class IndexableMessageTest {
     static final MessageUid MESSAGE_UID = MessageUid.of(154);
 
@@ -108,7 +110,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getHasAttachment()).isTrue();
@@ -140,7 +143,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.NO)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getHasAttachment()).isFalse();
@@ -170,7 +174,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.NO)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getAttachments()).isEmpty();
@@ -200,7 +205,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getAttachments()).isNotEmpty();
@@ -226,10 +232,10 @@ class IndexableMessageTest {
             .thenReturn(MESSAGE_UID);
 
         TextExtractor textExtractor = mock(TextExtractor.class);
-        when(textExtractor.extractContent(any(), any()))
-            .thenReturn(new ParsedContent(Optional.of("first attachment 
content"), ImmutableMap.of()))
-            .thenThrow(new RuntimeException("second cannot be parsed"))
-            .thenReturn(new ParsedContent(Optional.of("third attachment 
content"), ImmutableMap.of()));
+        when(textExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.just(new ParsedContent(Optional.of("first 
attachment content"), ImmutableMap.of())))
+            .thenReturn(Mono.error(new RuntimeException("second cannot be 
parsed")))
+            .thenReturn(Mono.just(new ParsedContent(Optional.of("third 
attachment content"), ImmutableMap.of())));
 
         // When
         IndexableMessage indexableMessage = IndexableMessage.builder()
@@ -237,7 +243,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         String NO_TEXTUAL_BODY = "The textual body is not present";
@@ -273,7 +280,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getMessageId()).isNull();
@@ -306,7 +314,8 @@ class IndexableMessageTest {
             .extractor(textExtractor)
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.YES)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isNull();
@@ -336,7 +345,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getMessageId()).isNull();
@@ -368,7 +378,8 @@ class IndexableMessageTest {
             .extractor(textExtractor)
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.YES)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isNull();
@@ -401,7 +412,8 @@ class IndexableMessageTest {
             .extractor(new DefaultTextExtractor())
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.NO)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isEqualTo("42");
diff --git 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
index e538721925..e9ac8a028e 100644
--- 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
+++ 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
@@ -107,7 +107,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
-        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block())
             .when(IGNORING_ARRAY_ORDER)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/spamMail.json"));
     }
@@ -129,7 +129,7 @@ class MessageToElasticSearchJsonTest {
         mail.setUid(UID);
         mail.setModSeq(MOD_SEQ);
 
-        assertThatJson(messageToElasticSearchJson.convertToJson(mail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(mail).block())
             .when(IGNORING_ARRAY_ORDER)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/inlined-mixed.json"));
     }
@@ -151,7 +151,7 @@ class MessageToElasticSearchJsonTest {
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
 
-        String actual = messageToElasticSearchJson.convertToJson(spamMail);
+        String actual = 
messageToElasticSearchJson.convertToJson(spamMail).block();
         assertThatJson(actual)
             .when(IGNORING_ARRAY_ORDER)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/invalidCharset.json"));
@@ -173,7 +173,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         htmlMail.setModSeq(MOD_SEQ);
         htmlMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail).block())
             .when(IGNORING_ARRAY_ORDER)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/htmlMail.json"));
     }
@@ -194,7 +194,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         pgpSignedMail.setModSeq(MOD_SEQ);
         pgpSignedMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail).block())
             .when(IGNORING_ARRAY_ORDER)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/pgpSignedMail.json"));
     }
@@ -215,7 +215,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         mail.setModSeq(MOD_SEQ);
         mail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(mail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(mail).block())
             .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/mail.json"));
     }
@@ -236,7 +236,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         recursiveMail.setModSeq(MOD_SEQ);
         recursiveMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail).block())
             .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json"));
     }
@@ -257,7 +257,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         mailWithNoInternalDate.setModSeq(MOD_SEQ);
         mailWithNoInternalDate.setUid(UID);
-        
assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block())
             .when(IGNORING_ARRAY_ORDER)
             .when(IGNORING_VALUES)
             
.isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json"));
@@ -283,7 +283,7 @@ class MessageToElasticSearchJsonTest {
             new DefaultTextExtractor(),
             ZoneId.of("Europe/Paris"),
             IndexAttachments.YES);
-        String convertToJson = 
messageToElasticSearchJson.convertToJson(mailWithNoInternalDate);
+        String convertToJson = 
messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block();
 
         // Then
         assertThatJson(convertToJson)
@@ -312,7 +312,7 @@ class MessageToElasticSearchJsonTest {
             new DefaultTextExtractor(),
             ZoneId.of("Europe/Paris"),
             IndexAttachments.NO);
-        String convertToJson = 
messageToElasticSearchJson.convertToJson(mailWithNoInternalDate);
+        String convertToJson = 
messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block();
 
         // Then
         assertThatJson(convertToJson)
@@ -388,7 +388,7 @@ class MessageToElasticSearchJsonTest {
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
 
-        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail))
+        
assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(
                 
ClassLoaderUtils.getSystemResourceAsString("eml/nonTextual.json", 
StandardCharsets.UTF_8));
@@ -414,7 +414,7 @@ class MessageToElasticSearchJsonTest {
                 new DefaultTextExtractor(),
                 ZoneId.of("Europe/Paris"),
                 IndexAttachments.NO);
-        String convertToJsonWithoutAttachment = 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
+        String convertToJsonWithoutAttachment = 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block();
 
         // Then
         assertThatJson(convertToJsonWithoutAttachment)
@@ -443,9 +443,7 @@ class MessageToElasticSearchJsonTest {
                 new JsoupTextExtractor(),
                 ZoneId.of("Europe/Paris"),
                 IndexAttachments.NO);
-        String convertToJsonWithoutAttachment = 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
-
-        System.out.println(convertToJsonWithoutAttachment);
+        String convertToJsonWithoutAttachment = 
messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block();
 
         // Then
         assertThatJson(convertToJsonWithoutAttachment)
diff --git 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
index 7a0112ed4f..f2afdd0793 100644
--- 
a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
+++ 
b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.james.mailbox.extractor.ParsedContent;
 import org.apache.james.mailbox.model.ContentType.MediaType;
 import org.apache.james.mailbox.model.ContentType.SubType;
 import org.junit.jupiter.api.Test;
@@ -45,7 +46,9 @@ class MimePartTest {
             .addBodyContent(new 
ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)))
             .addMediaType(MediaType.of("text"))
             .addSubType(SubType.of("plain"))
-            .build();
+            .build()
+            .asMimePart((in, contentType) -> ParsedContent.empty())
+            .block();
 
         assertThat(mimePart.getTextualBody()).contains(body);
     }
diff --git a/mailbox/tika/pom.xml b/mailbox/tika/pom.xml
index 0abe712a88..567bd92787 100644
--- a/mailbox/tika/pom.xml
+++ b/mailbox/tika/pom.xml
@@ -63,10 +63,18 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor.netty</groupId>
+            <artifactId>reactor-netty</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
index 30cd94f35b..562fa60cc8 100644
--- 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
+++ 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
@@ -20,11 +20,10 @@
 package org.apache.james.mailbox.tika;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.time.Duration;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.mailbox.extractor.ParsedContent;
@@ -34,17 +33,18 @@ import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Weigher;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.Weigher;
 import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import reactor.core.publisher.Mono;
 
 public class CachingTextExtractor implements TextExtractor {
     private final TextExtractor underlying;
-    private final Cache<String, ParsedContent> cache;
+    private final AsyncCache<String, ParsedContent> cache;
     private final Metric weightMetric;
 
     public CachingTextExtractor(TextExtractor underlying, Duration 
cacheEvictionPeriod, Long cacheWeightInBytes,
@@ -52,20 +52,22 @@ public class CachingTextExtractor implements TextExtractor {
         this.underlying = underlying;
         this.weightMetric = 
metricFactory.generate("textExtractor.cache.weight");
 
-        Weigher<String, ParsedContent> weigher =
-            (key, parsedContent) -> computeWeight(parsedContent);
         RemovalListener<String, ParsedContent> removalListener =
-            notification -> Optional.ofNullable(notification.getValue())
+            (key, value, removalCause) -> Optional.ofNullable(value)
                 .map(this::computeWeight)
                 .ifPresent(weightMetric::remove);
 
-        this.cache = CacheBuilder.newBuilder()
-            .expireAfterAccess(cacheEvictionPeriod.toMillis(), 
TimeUnit.MILLISECONDS)
+        Weigher<String, ParsedContent> weigher =
+            (key, parsedContent) -> computeWeight(parsedContent);
+
+        cache = Caffeine.newBuilder()
+            
.expireAfterAccess(Duration.ofMillis(cacheEvictionPeriod.toMillis()))
             .maximumWeight(cacheWeightInBytes)
             .weigher(weigher)
+            .evictionListener(removalListener)
             .recordStats()
-            .removalListener(removalListener)
-            .build();
+            .buildAsync();
+
         recordStats(gaugeRegistry);
     }
 
@@ -73,28 +75,28 @@ public class CachingTextExtractor implements TextExtractor {
         gaugeRegistry
             .register(
                 "textExtractor.cache.hit.rate",
-                () -> cache.stats().hitRate())
+                () -> cache.synchronous().stats().hitRate())
             .register(
                 "textExtractor.cache.hit.count",
-                () -> cache.stats().hitCount());
+                () -> cache.synchronous().stats().hitCount());
             gaugeRegistry.register(
                 "textExtractor.cache.load.count",
-                () -> cache.stats().loadCount())
+                () -> cache.synchronous().stats().loadCount())
             .register(
                 "textExtractor.cache.eviction.count",
-                () -> cache.stats().evictionCount())
+                () -> cache.synchronous().stats().evictionCount())
             .register(
                 "textExtractor.cache.load.exception.rate",
-                () -> cache.stats().loadExceptionRate())
+                () -> cache.synchronous().stats().loadFailureRate())
             .register(
                 "textExtractor.cache.load.miss.rate",
-                () -> cache.stats().missRate())
+                () -> cache.synchronous().stats().missRate())
             .register(
                 "textExtractor.cache.load.miss.count",
-                () -> cache.stats().missCount())
+                () -> cache.synchronous().stats().missCount())
             .register(
                 "textExtractor.cache.size",
-                cache::size);
+                cache.synchronous()::estimatedSize);
     }
 
     private int computeWeight(ParsedContent parsedContent) {
@@ -109,21 +111,25 @@ public class CachingTextExtractor implements 
TextExtractor {
     }
 
     @Override
-    public ParsedContent extractContent(InputStream inputStream, ContentType 
contentType) throws Exception {
-        byte[] bytes = IOUtils.toByteArray(inputStream);
-        String key = Hashing.sha256().hashBytes(bytes).toString();
-
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, 
ContentType contentType) {
         try {
-            return cache.get(key, () -> retrieveAndUpdateWeight(bytes, 
contentType));
-        } catch (ExecutionException | UncheckedExecutionException e) {
-            throw unwrap(e);
+            byte[] bytes = IOUtils.toByteArray(inputStream);
+            String key = Hashing.sha256().hashBytes(bytes).toString();
+
+            return Mono.fromFuture(cache.get(key, (a, b) -> 
retrieveAndUpdateWeight(bytes, contentType).toFuture()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    private ParsedContent retrieveAndUpdateWeight(byte[] bytes, ContentType 
contentType) throws Exception {
-        ParsedContent parsedContent = underlying.extractContent(new 
ByteArrayInputStream(bytes), contentType);
-        weightMetric.add(computeWeight(parsedContent));
-        return parsedContent;
+    @Override
+    public ParsedContent extractContent(InputStream inputStream, ContentType 
contentType) {
+        return extractContentReactive(inputStream, contentType).block();
+    }
+
+    private Mono<ParsedContent> retrieveAndUpdateWeight(byte[] bytes, 
ContentType contentType) {
+        return underlying.extractContentReactive(new 
ByteArrayInputStream(bytes), contentType)
+            .doOnNext(next -> weightMetric.add(computeWeight(next)));
     }
 
     private Exception unwrap(Exception e) {
@@ -135,6 +141,6 @@ public class CachingTextExtractor implements TextExtractor {
 
     @VisibleForTesting
     long size() {
-        return cache.size();
+        return cache.synchronous().estimatedSize();
     }
 }
diff --git 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
index d846b75268..63cf46ee3f 100644
--- 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
+++ 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
@@ -28,6 +28,8 @@ import org.apache.james.mailbox.model.ContentType.MimeType;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class ContentTypeFilteringTextExtractor implements TextExtractor {
 
     private final TextExtractor textExtractor;
@@ -46,6 +48,14 @@ public class ContentTypeFilteringTextExtractor implements 
TextExtractor {
         return textExtractor.extractContent(inputStream, contentType);
     }
 
+    @Override
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, 
ContentType contentType) {
+        if (isBlacklisted(contentType.mimeType())) {
+            return Mono.just(ParsedContent.empty());
+        }
+        return textExtractor.extractContentReactive(inputStream, contentType);
+    }
+
     private boolean isBlacklisted(MimeType contentType) {
         return contentTypeBlacklist.contains(contentType);
     }
diff --git 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
index ceae8ffd33..434a109598 100644
--- 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
+++ 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
@@ -19,11 +19,12 @@
 package org.apache.james.mailbox.tika;
 
 import java.io.InputStream;
-import java.util.Optional;
 
 import org.apache.james.mailbox.model.ContentType;
 
+import reactor.core.publisher.Mono;
+
 public interface TikaHttpClient {
 
-    Optional<InputStream> recursiveMetaDataAsJson(InputStream inputStream, 
ContentType contentType);
+    Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, 
ContentType contentType);
 }
diff --git 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
index 5f00ca9efd..e7afed7fb7 100644
--- 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
+++ 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
@@ -18,19 +18,24 @@
  ****************************************************************/
 package org.apache.james.mailbox.tika;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
-import java.util.Optional;
+import java.time.Duration;
 
-import org.apache.http.client.fluent.Request;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.entity.ContentType;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.netty.http.client.HttpClient;
+
 public class TikaHttpClientImpl implements TikaHttpClient {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TikaHttpClientImpl.class);
@@ -38,10 +43,14 @@ public class TikaHttpClientImpl implements TikaHttpClient {
 
     private final TikaConfiguration tikaConfiguration;
     private final URI recursiveMetaData;
+    private final HttpClient httpClient;
 
     public TikaHttpClientImpl(TikaConfiguration tikaConfiguration) throws 
URISyntaxException {
         this.tikaConfiguration = tikaConfiguration;
         this.recursiveMetaData = 
buildURI(tikaConfiguration).resolve(RECURSIVE_METADATA_AS_TEXT_ENDPOINT);
+
+        httpClient = HttpClient.create()
+            
.responseTimeout(Duration.ofMillis(tikaConfiguration.getTimeoutInMillis()));
     }
 
     private URI buildURI(TikaConfiguration tikaConfiguration) throws 
URISyntaxException {
@@ -53,23 +62,31 @@ public class TikaHttpClientImpl implements TikaHttpClient {
     }
 
     @Override
-    public Optional<InputStream> recursiveMetaDataAsJson(InputStream 
inputStream, org.apache.james.mailbox.model.ContentType contentType) {
-        try {
-            ContentType httpContentType = 
ContentType.create(contentType.mimeType().asString(),
-                contentType.charset()
-                    .map(Charset::name)
-                    .orElse(null));
-            return Optional.ofNullable(
-                    Request.Put(recursiveMetaData)
-                        .socketTimeout(tikaConfiguration.getTimeoutInMillis())
-                        .bodyStream(inputStream, httpContentType)
-                        .execute()
-                        .returnContent()
-                        .asStream());
-        } catch (IOException e) {
-            LOGGER.warn("Failing to call Tika for content type {}", 
contentType, e);
-            return Optional.empty();
-        }
+    public Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, 
org.apache.james.mailbox.model.ContentType contentType) {
+        ContentType httpContentType = 
ContentType.create(contentType.mimeType().asString(),
+            contentType.charset()
+                .map(Charset::name)
+                .orElse(null));
+
+        return httpClient
+            .headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, 
httpContentType.toString()))
+            .put()
+            .uri(recursiveMetaData)
+            .send(ReactorUtils.toChunks(inputStream, 16 * 1024)
+                .map(Unpooled::wrappedBuffer)
+                .subscribeOn(Schedulers.elastic()))
+            .responseSingle((resp, content) -> {
+                if (resp.status().code() == 200) {
+                    return content.asInputStream();
+                } else {
+                    LOGGER.warn("Failing to call Tika for content type {} 
status {}", contentType, resp.status().code());
+                    return Mono.empty();
+                }
+            })
+            .onErrorResume(e -> {
+                LOGGER.warn("Failing to call Tika for content type {}", 
contentType, e);
+                return Mono.empty();
+            });
     }
 
 }
diff --git 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
index b7fc39fa50..8c639096b7 100644
--- 
a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
+++ 
b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
@@ -52,6 +52,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+
 public class TikaTextExtractor implements TextExtractor {
     private static final ContentType.MediaType TEXT = 
ContentType.MediaType.of("text");
 
@@ -77,24 +79,30 @@ public class TikaTextExtractor implements TextExtractor {
     }
 
     @Override
-    public ParsedContent extractContent(InputStream inputStream, ContentType 
contentType) throws Exception {
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, 
ContentType contentType) {
         if (contentType.mediaType().equals(TEXT)) {
-            return jsoupTextExtractor.extractContent(inputStream, contentType);
+            return jsoupTextExtractor.extractContentReactive(inputStream, 
contentType);
         }
-        return 
metricFactory.decorateSupplierWithTimerMetric("tikaTextExtraction", 
Throwing.supplier(
-            () -> performContentExtraction(inputStream, contentType))
-            .sneakyThrow());
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("tikaTextExtraction",
+            performContentExtraction(inputStream, contentType)));
+    }
+
+    @Override
+    public ParsedContent extractContent(InputStream inputStream, ContentType 
contentType) throws Exception {
+        return extractContentReactive(inputStream, contentType)
+            .block();
     }
 
-    public ParsedContent performContentExtraction(InputStream inputStream, 
ContentType contentType) throws IOException {
-        ContentAndMetadata contentAndMetadata = 
convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType));
-        return new ParsedContent(contentAndMetadata.getContent(), 
contentAndMetadata.getMetadata());
+    public Mono<ParsedContent> performContentExtraction(InputStream 
inputStream, ContentType contentType) {
+        Mono<ContentAndMetadata> contentAndMetadata = 
convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType));
+        return contentAndMetadata
+            .map(result -> new ParsedContent(result.getContent(), 
result.getMetadata()));
     }
 
-    private ContentAndMetadata convert(Optional<InputStream> maybeInputStream) 
throws IOException {
+    private Mono<ContentAndMetadata> convert(Mono<InputStream> 
maybeInputStream) {
         return maybeInputStream
                 .map(Throwing.function(inputStream -> 
objectMapper.readValue(inputStream, ContentAndMetadata.class)))
-                .orElse(ContentAndMetadata.empty());
+                .switchIfEmpty(Mono.just(ContentAndMetadata.empty()));
     }
 
     @VisibleForTesting
diff --git 
a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
 
b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
index 6722733e87..54cea13a42 100644
--- 
a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
+++ 
b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.tika;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -54,7 +53,9 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 
-public class CachingTextExtractorTest {
+import reactor.core.publisher.Mono;
+
+class CachingTextExtractorTest {
 
     private static final ParsedContent RESULT = new 
ParsedContent(Optional.of("content"), ImmutableMap.of());
     public static final String BIG_STRING = Strings.repeat("0123456789", 103 * 
1024);
@@ -69,7 +70,7 @@ public class CachingTextExtractorTest {
     private TextExtractor wrappedTextExtractor;
 
     @BeforeEach
-    void setUp() throws Exception {
+    void setUp() {
         wrappedTextExtractor = mock(TextExtractor.class);
         textExtractor = new CachingTextExtractor(wrappedTextExtractor,
             TikaConfiguration.DEFAULT_CACHE_EVICTION_PERIOD,
@@ -77,15 +78,15 @@ public class CachingTextExtractorTest {
             new RecordingMetricFactory(),
             new NoopGaugeRegistry());
 
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(RESULT);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.just(RESULT));
     }
 
     @Test
     void extractContentShouldCallUnderlyingTextExtractor() throws Exception {
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), 
any());
         verifyNoMoreInteractions(wrappedTextExtractor);
     }
 
@@ -94,79 +95,30 @@ public class CachingTextExtractorTest {
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), 
any());
         verifyNoMoreInteractions(wrappedTextExtractor);
     }
 
     @Test
-    void extractContentShouldPropagateCheckedException() throws Exception {
+    void extractContentShouldPropagateCheckedException() {
         IOException ioException = new IOException("Any");
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenThrow(ioException);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.error(ioException));
 
         assertThatThrownBy(() -> 
textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE))
-            .isEqualTo(ioException);
+            .hasCause(ioException);
     }
 
     @Test
-    void extractContentShouldPropagateRuntimeException() throws Exception {
+    void extractContentShouldPropagateRuntimeException() {
         RuntimeException runtimeException = new RuntimeException("Any");
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenThrow(runtimeException);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.error(runtimeException));
 
         assertThatThrownBy(() -> 
textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE))
             .isEqualTo(runtimeException);
     }
 
-    @Test
-    void cacheShouldEvictEntriesWhenFull() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> 
textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        assertThat(textExtractor.size())
-            .isLessThanOrEqualTo(5);
-    }
-
-    @Test
-    void olderEntriesShouldBeEvictedFirst() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> 
textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        reset(wrappedTextExtractor);
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        textExtractor.extractContent(STREAM_GENERATOR.apply(1), CONTENT_TYPE);
-
-        verify(wrappedTextExtractor).extractContent(any(), any());
-    }
-
-    @Test
-    void youngerEntriesShouldBePreservedByEviction() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> 
textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        reset(wrappedTextExtractor);
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        textExtractor.extractContent(STREAM_GENERATOR.apply(9), CONTENT_TYPE);
-
-        verifyZeroInteractions(wrappedTextExtractor);
-    }
-
     @Test
     void frequentlyAccessedEntriesShouldBePreservedByEviction() throws 
Exception {
         when(wrappedTextExtractor.extractContent(any(), any()))
@@ -191,7 +143,7 @@ public class CachingTextExtractorTest {
             .threadCount(10)
             .runSuccessfullyWithin(Duration.ofMinutes(1));
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), 
any());
     }
 
 }
\ No newline at end of file
diff --git 
a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
 
b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
index e47cee2cee..6c0f76ac0f 100644
--- 
a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
+++ 
b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
@@ -26,7 +26,6 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.mailbox.extractor.ParsedContent;
@@ -42,6 +41,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.TextNode;
 
+import reactor.core.publisher.Mono;
+
 class TikaTextExtractorTest {
 
     TextExtractor textExtractor;
@@ -170,7 +171,7 @@ class TikaTextExtractorTest {
     void deserializerShouldNotThrowWhenMoreThanOneNode() throws Exception {
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new 
ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice 
document !\"}, " +
+            (inputStream, contentType) -> Mono.just(new 
ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice 
document !\"}, " +
                                                             "{\"Chroma 
BlackIsZero\": \"true\"}]")
                                                         
.getBytes(StandardCharsets.UTF_8))));
 
@@ -183,7 +184,7 @@ class TikaTextExtractorTest {
         String expectedExtractedContent = "content A";
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new 
ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + 
"\"}, " +
+            (inputStream, contentType) -> Mono.just(new 
ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + 
"\"}, " +
                                                             
"{\"X-TIKA:content\": \"content B\"}]")
                                                         
.getBytes(StandardCharsets.UTF_8))));
 
@@ -198,7 +199,7 @@ class TikaTextExtractorTest {
     void deserializerShouldThrowWhenNodeIsNotAnObject() {
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new 
ByteArrayInputStream("[\"value1\"]"
+            (inputStream, contentType) -> Mono.just(new 
ByteArrayInputStream("[\"value1\"]"
                                                         
.getBytes(StandardCharsets.UTF_8))));
 
         InputStream inputStream = new 
ByteArrayInputStream("toto".getBytes(StandardCharsets.UTF_8));
diff --git a/pom.xml b/pom.xml
index dce21c398c..0817a60cf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2092,6 +2092,11 @@
                 <artifactId>jackson-datatype-jsr310</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.ben-manes.caffeine</groupId>
+                <artifactId>caffeine</artifactId>
+                <version>3.0.5</version>
+            </dependency>
             <dependency>
                 <groupId>com.github.dpaukov</groupId>
                 <artifactId>combinatoricslib3</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to