This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 643cd023a6bbe57094fb03ed55a76f44f1d050bc Author: Benoit Tellier <[email protected]> AuthorDate: Wed Mar 2 14:31:25 2022 +0700 JAMES-3719 Reactive textual content extraction with Apache Tika Tika was called from reactive code and was doing blocking HTTP calls from within the MIME parsing code. This generate: - An unneeded thread consumption as we have some threads waiting for Tika response - Potentially dangerous blocking calls: for instance the InVM event bus was doing such calls on the parallel thread pool (where it is critical NOT to block)... - Also the connection was opened on a per-call basis, not being reused. We introduce the following changes: - Reactification of the TextExtractor API - We re-implement the HTTP calls done by TikaTextExtractor with reactor-netty which allows us to pool HTTP connections and do this in a non-blocking reactive fashion. - We provide a reactive cache using the caffeine caching library - Guava caches are blocking thus not an option... - We uncouple the text extraction from the MIME parsing phase by introducing an intermediate POJO. Doing so requires us to do a post-parsing copy of content. TODO only do the copy if necessary. We don't want to copy large attachments for whom no text is going to be extracted... - Finally we reactify index content generation for ElasticSearch code. --- .../james/mailbox/extractor/TextExtractor.java | 6 + .../ElasticSearchListeningMessageSearchIndex.java | 25 ++-- .../elasticsearch/v7/json/IndexableMessage.java | 138 +++++++++++---------- .../v7/json/MessageToElasticSearchJson.java | 33 ++--- .../mailbox/elasticsearch/v7/json/MimePart.java | 82 ++++++++---- .../v7/json/MimePartContainerBuilder.java | 4 +- .../elasticsearch/v7/json/MimePartParser.java | 6 +- .../v7/json/RootMimePartContainerBuilder.java | 6 +- .../v7/json/IndexableMessageTest.java | 40 +++--- .../v7/json/MessageToElasticSearchJsonTest.java | 28 ++--- .../elasticsearch/v7/json/MimePartTest.java | 5 +- mailbox/tika/pom.xml | 8 ++ .../james/mailbox/tika/CachingTextExtractor.java | 76 ++++++------ .../tika/ContentTypeFilteringTextExtractor.java | 10 ++ .../apache/james/mailbox/tika/TikaHttpClient.java | 5 +- .../james/mailbox/tika/TikaHttpClientImpl.java | 57 ++++++--- .../james/mailbox/tika/TikaTextExtractor.java | 28 +++-- .../mailbox/tika/CachingTextExtractorTest.java | 80 +++--------- .../james/mailbox/tika/TikaTextExtractorTest.java | 9 +- pom.xml | 5 + 20 files changed, 363 insertions(+), 288 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java index 6b911225b6..2fdf7e555e 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java @@ -23,8 +23,14 @@ import java.io.InputStream; import org.apache.james.mailbox.model.ContentType; +import reactor.core.publisher.Mono; + public interface TextExtractor { ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception; + default Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) { + return Mono.fromCallable(() -> extractContent(inputStream, contentType)); + } + } diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java index 591a5ddcb5..29ef16119b 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java @@ -162,23 +162,22 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe RoutingKey from = routingKeyFactory.from(mailbox.getMailboxId()); DocumentId id = indexIdFor(mailbox.getMailboxId(), message.getUid()); - return Mono.fromCallable(() -> generateIndexedJson(mailbox, message, session)) + return generateIndexedJson(mailbox, message, session) .flatMap(jsonContent -> elasticSearchIndexer.index(id, jsonContent, from)) .then(); } - private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException { - try { - return messageToElasticSearchJson.convertToJson(message); - } catch (Exception e) { - LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ", - mailbox.getName(), - mailbox.getMailboxId().serialize(), - session.getUser().asString(), - message.getUid(), - e); - return messageToElasticSearchJson.convertToJsonWithoutAttachment(message); - } + private Mono<String> generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) { + return messageToElasticSearchJson.convertToJson(message) + .onErrorResume(e -> { + LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ", + mailbox.getName(), + mailbox.getMailboxId().serialize(), + session.getUser().asString(), + message.getUid(), + e); + return messageToElasticSearchJson.convertToJsonWithoutAttachment(message); + }); } @Override diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java index 072513fbaa..d224b43d56 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java @@ -39,6 +39,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; + public class IndexableMessage { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ"); @@ -62,7 +64,7 @@ public class IndexableMessage { private Builder() { } - public IndexableMessage build() { + public Mono<IndexableMessage> build() { Preconditions.checkNotNull(message.getMailboxId()); Preconditions.checkNotNull(textExtractor); Preconditions.checkNotNull(indexAttachments); @@ -95,73 +97,77 @@ public class IndexableMessage { return this; } - private IndexableMessage instantiateIndexedMessage() throws IOException, MimeException { + private Mono<IndexableMessage> instantiateIndexedMessage() throws IOException, MimeException { String messageId = SearchUtil.getSerializedMessageIdIfSupportedByUnderlyingStorageOrNull(message); String threadId = SearchUtil.getSerializedThreadIdIfSupportedByUnderlyingStorageOrNull(message); - MimePart parsingResult = new MimePartParser(message, textExtractor).parse(); - - Optional<String> bodyText = parsingResult.locateFirstTextBody(); - Optional<String> bodyHtml = parsingResult.locateFirstHtmlBody(); - - boolean hasAttachment = MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments()); - List<MimePart> attachments = setFlattenedAttachments(parsingResult, indexAttachments); - - HeaderCollection headerCollection = parsingResult.getHeaderCollection(); - ZonedDateTime internalDate = getSanitizedInternalDate(message, zoneId); - - List<HeaderCollection.Header> headers = headerCollection.getHeaders(); - Subjects subjects = Subjects.from(headerCollection.getSubjectSet()); - EMailers from = EMailers.from(headerCollection.getFromAddressSet()); - EMailers to = EMailers.from(headerCollection.getToAddressSet()); - EMailers cc = EMailers.from(headerCollection.getCcAddressSet()); - EMailers bcc = EMailers.from(headerCollection.getBccAddressSet()); - String sentDate = DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate)); - Optional<String> mimeMessageID = headerCollection.getMessageID(); - - long uid = message.getUid().asLong(); - String mailboxId = message.getMailboxId().serialize(); - ModSeq modSeq = message.getModSeq(); - long size = message.getFullContentOctets(); - String date = DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId)); - String mediaType = message.getMediaType(); - String subType = message.getSubType(); - boolean isAnswered = message.isAnswered(); - boolean isDeleted = message.isDeleted(); - boolean isDraft = message.isDraft(); - boolean isFlagged = message.isFlagged(); - boolean isRecent = message.isRecent(); - boolean isUnRead = !message.isSeen(); - String[] userFlags = message.createFlags().getUserFlags(); - - return new IndexableMessage( - attachments, - bcc, - bodyHtml, - bodyText, - cc, - date, - from, - hasAttachment, - headers, - isAnswered, - isDeleted, - isDraft, - isFlagged, - isRecent, - isUnRead, - mailboxId, - mediaType, - messageId, - threadId, - modSeq, - sentDate, - size, - subjects, - subType, - to, - uid, - userFlags, - mimeMessageID); + + return new MimePartParser(message, textExtractor).parse() + .asMimePart(textExtractor) + .map(parsingResult -> { + + Optional<String> bodyText = parsingResult.locateFirstTextBody(); + Optional<String> bodyHtml = parsingResult.locateFirstHtmlBody(); + + boolean hasAttachment = MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments()); + List<MimePart> attachments = setFlattenedAttachments(parsingResult, indexAttachments); + + HeaderCollection headerCollection = parsingResult.getHeaderCollection(); + ZonedDateTime internalDate = getSanitizedInternalDate(message, zoneId); + + List<HeaderCollection.Header> headers = headerCollection.getHeaders(); + Subjects subjects = Subjects.from(headerCollection.getSubjectSet()); + EMailers from = EMailers.from(headerCollection.getFromAddressSet()); + EMailers to = EMailers.from(headerCollection.getToAddressSet()); + EMailers cc = EMailers.from(headerCollection.getCcAddressSet()); + EMailers bcc = EMailers.from(headerCollection.getBccAddressSet()); + String sentDate = DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate)); + Optional<String> mimeMessageID = headerCollection.getMessageID(); + + long uid = message.getUid().asLong(); + String mailboxId = message.getMailboxId().serialize(); + ModSeq modSeq = message.getModSeq(); + long size = message.getFullContentOctets(); + String date = DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId)); + String mediaType = message.getMediaType(); + String subType = message.getSubType(); + boolean isAnswered = message.isAnswered(); + boolean isDeleted = message.isDeleted(); + boolean isDraft = message.isDraft(); + boolean isFlagged = message.isFlagged(); + boolean isRecent = message.isRecent(); + boolean isUnRead = !message.isSeen(); + String[] userFlags = message.createFlags().getUserFlags(); + + return new IndexableMessage( + attachments, + bcc, + bodyHtml, + bodyText, + cc, + date, + from, + hasAttachment, + headers, + isAnswered, + isDeleted, + isDraft, + isFlagged, + isRecent, + isUnRead, + mailboxId, + mediaType, + messageId, + threadId, + modSeq, + sentDate, + size, + subjects, + subType, + to, + uid, + userFlags, + mimeMessageID); + }); } private List<MimePart> setFlattenedAttachments(MimePart parsingResult, IndexAttachments indexAttachments) { diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java index 6a8168c34c..564d750083 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java @@ -33,8 +33,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.guava.GuavaModule; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; +import reactor.core.publisher.Mono; + public class MessageToElasticSearchJson { private final ObjectMapper mapper; @@ -56,24 +59,26 @@ public class MessageToElasticSearchJson { this(textExtractor, ZoneId.systemDefault(), indexAttachments); } - public String convertToJson(MailboxMessage message) throws JsonProcessingException { + public Mono<String> convertToJson(MailboxMessage message) { Preconditions.checkNotNull(message); - return mapper.writeValueAsString(IndexableMessage.builder() - .message(message) - .extractor(textExtractor) - .zoneId(zoneId) - .indexAttachments(indexAttachments) - .build()); + return IndexableMessage.builder() + .message(message) + .extractor(textExtractor) + .zoneId(zoneId) + .indexAttachments(indexAttachments) + .build() + .map(Throwing.function(mapper::writeValueAsString)); } - public String convertToJsonWithoutAttachment(MailboxMessage message) throws JsonProcessingException { - return mapper.writeValueAsString(IndexableMessage.builder() - .message(message) - .extractor(textExtractor) - .zoneId(zoneId) - .indexAttachments(IndexAttachments.NO) - .build()); + public Mono<String> convertToJsonWithoutAttachment(MailboxMessage message) { + return IndexableMessage.builder() + .message(message) + .extractor(textExtractor) + .zoneId(zoneId) + .indexAttachments(IndexAttachments.NO) + .build() + .map(Throwing.function(mapper::writeValueAsString)); } public String getUpdatedJsonMessagePart(Flags flags, ModSeq modSeq) throws JsonProcessingException { diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java index 722bd75756..918306fddf 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.elasticsearch.v7.json; +import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -40,17 +41,21 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class MimePart { public static class Builder implements MimePartContainerBuilder { private final HeaderCollection.Builder headerCollectionBuilder; private Optional<InputStream> bodyContent; - private final List<MimePart> children; + private final List<ParsedMimePart> children; private Optional<MediaType> mediaType; private Optional<SubType> subType; private Optional<String> fileName; @@ -85,7 +90,7 @@ public class MimePart { } @Override - public Builder addChild(MimePart mimePart) { + public Builder addChild(ParsedMimePart mimePart) { children.add(mimePart); return this; } @@ -129,11 +134,11 @@ public class MimePart { } @Override - public MimePart build() { - Optional<ParsedContent> parsedContent = parseContent(textExtractor); - return new MimePart( + public ParsedMimePart build() { + return new ParsedMimePart( headerCollectionBuilder.build(), - parsedContent.flatMap(ParsedContent::getTextualContent), + bodyContent, + charset, mediaType, subType, fileName, @@ -141,27 +146,61 @@ public class MimePart { contentDisposition, children); } + } - private Optional<ParsedContent> parseContent(TextExtractor textExtractor) { - if (bodyContent.isPresent()) { - try { - return Optional.of(extractText(textExtractor, bodyContent.get())); - } catch (Exception e) { - LOGGER.warn("Failed parsing attachment", e); - } - } - return Optional.empty(); + public static class ParsedMimePart { + private final HeaderCollection headerCollection; + private final Optional<byte[]> bodyContent; + private final Optional<Charset> charset; + private final Optional<MediaType> mediaType; + private final Optional<SubType> subType; + private final Optional<String> fileName; + private final Optional<String> fileExtension; + private final Optional<String> contentDisposition; + private final List<ParsedMimePart> attachments; + + public ParsedMimePart(HeaderCollection headerCollection, Optional<InputStream> bodyContent, Optional<Charset> charset, + Optional<MediaType> mediaType, + Optional<SubType> subType, Optional<String> fileName, Optional<String> fileExtension, + Optional<String> contentDisposition, List<ParsedMimePart> attachments) { + this.headerCollection = headerCollection; + this.bodyContent = bodyContent.map(Throwing.function(IOUtils::toByteArray)); + this.mediaType = mediaType; + this.subType = subType; + this.fileName = fileName; + this.fileExtension = fileExtension; + this.contentDisposition = contentDisposition; + this.attachments = attachments; + this.charset = charset; } - private ParsedContent extractText(TextExtractor textExtractor, InputStream bodyContent) throws Exception { + public Mono<MimePart> asMimePart(TextExtractor textExtractor) { + return Flux.fromIterable(attachments) + .concatMap(attachment -> attachment.asMimePart(textExtractor)) + .collectList() + .flatMap(attachments -> extractText(textExtractor) + .map(Optional::ofNullable) + .switchIfEmpty(Mono.just(Optional.empty())) + .onErrorResume(e -> { + LOGGER.warn("Failure extracting text message for some attachments", e); + return Mono.just(Optional.empty()); + }) + .map(text -> new MimePart(headerCollection, text.flatMap(ParsedContent::getTextualContent), + mediaType, subType, fileName, fileExtension, contentDisposition, attachments))); + } + + private Mono<ParsedContent> extractText(TextExtractor textExtractor) { + if (bodyContent.isEmpty()) { + return Mono.empty(); + } if (shouldPerformTextExtraction()) { - return textExtractor.extractContent( - bodyContent, + return textExtractor.extractContentReactive( + new ByteArrayInputStream(bodyContent.get()), computeContentType().orElse(null)); } - return new ParsedContent( - Optional.ofNullable(IOUtils.toString(bodyContent, charset.orElse(StandardCharsets.UTF_8))), - ImmutableMap.of()); + return Mono.fromCallable(() -> new ParsedContent( + Optional.ofNullable(IOUtils.toString(new ByteArrayInputStream(bodyContent.get()), charset.orElse(StandardCharsets.UTF_8))), + ImmutableMap.of())); } private boolean shouldPerformTextExtraction() { @@ -185,7 +224,6 @@ public class MimePart { return Optional.empty(); } } - } public static Builder builder() { diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java index b5083f5377..d3d88c4f67 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java @@ -29,7 +29,7 @@ import org.apache.james.mime4j.stream.Field; public interface MimePartContainerBuilder { - MimePart build(); + MimePart.ParsedMimePart build(); MimePartContainerBuilder using(TextExtractor textExtractor); @@ -37,7 +37,7 @@ public interface MimePartContainerBuilder { MimePartContainerBuilder addBodyContent(InputStream bodyContent); - MimePartContainerBuilder addChild(MimePart mimePart); + MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart); MimePartContainerBuilder addFileName(String fileName); diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java index f5091f3201..81a1a15e8a 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java @@ -50,7 +50,7 @@ public class MimePartParser { private final TextExtractor textExtractor; private final MimeTokenStream stream; private final Deque<MimePartContainerBuilder> builderStack; - private MimePart result; + private MimePart.ParsedMimePart result; private MimePartContainerBuilder currentlyBuildMimePart; public MimePartParser(Message message, TextExtractor textExtractor) { @@ -63,7 +63,7 @@ public class MimePartParser { new DefaultBodyDescriptorBuilder(null, FIELD_PARSER, DecodeMonitor.SILENT)); } - public MimePart parse() throws IOException, MimeException { + public MimePart.ParsedMimePart parse() throws IOException, MimeException { stream.parse(message.getFullContent()); for (EntityState state = stream.getState(); state != EntityState.T_END_OF_STREAM; state = stream.next()) { processMimePart(stream, state); @@ -107,7 +107,7 @@ public class MimePartParser { } private void closeMimePart() { - MimePart bodyMimePart = currentlyBuildMimePart.using(textExtractor).build(); + MimePart.ParsedMimePart bodyMimePart = currentlyBuildMimePart.using(textExtractor).build(); if (!builderStack.isEmpty()) { builderStack.peek().addChild(bodyMimePart); } else { diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java index 48fadf28f5..93755a6790 100644 --- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java +++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java @@ -33,10 +33,10 @@ public class RootMimePartContainerBuilder implements MimePartContainerBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(RootMimePartContainerBuilder.class); - private MimePart rootMimePart; + private MimePart.ParsedMimePart rootMimePart; @Override - public MimePart build() { + public MimePart.ParsedMimePart build() { return rootMimePart; } @@ -57,7 +57,7 @@ public class RootMimePartContainerBuilder implements MimePartContainerBuilder { } @Override - public MimePartContainerBuilder addChild(MimePart mimePart) { + public MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart) { if (rootMimePart == null) { rootMimePart = mimePart; } else { diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java index d36b9691c7..62e4d85871 100644 --- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java +++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java @@ -56,6 +56,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Mono; + class IndexableMessageTest { static final MessageUid MESSAGE_UID = MessageUid.of(154); @@ -108,7 +110,8 @@ class IndexableMessageTest { .extractor(new DefaultTextExtractor()) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getHasAttachment()).isTrue(); @@ -140,7 +143,8 @@ class IndexableMessageTest { .extractor(new DefaultTextExtractor()) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.NO) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getHasAttachment()).isFalse(); @@ -170,7 +174,8 @@ class IndexableMessageTest { .extractor(new DefaultTextExtractor()) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.NO) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getAttachments()).isEmpty(); @@ -200,7 +205,8 @@ class IndexableMessageTest { .extractor(new DefaultTextExtractor()) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getAttachments()).isNotEmpty(); @@ -226,10 +232,10 @@ class IndexableMessageTest { .thenReturn(MESSAGE_UID); TextExtractor textExtractor = mock(TextExtractor.class); - when(textExtractor.extractContent(any(), any())) - .thenReturn(new ParsedContent(Optional.of("first attachment content"), ImmutableMap.of())) - .thenThrow(new RuntimeException("second cannot be parsed")) - .thenReturn(new ParsedContent(Optional.of("third attachment content"), ImmutableMap.of())); + when(textExtractor.extractContentReactive(any(), any())) + .thenReturn(Mono.just(new ParsedContent(Optional.of("first attachment content"), ImmutableMap.of()))) + .thenReturn(Mono.error(new RuntimeException("second cannot be parsed"))) + .thenReturn(Mono.just(new ParsedContent(Optional.of("third attachment content"), ImmutableMap.of()))); // When IndexableMessage indexableMessage = IndexableMessage.builder() @@ -237,7 +243,8 @@ class IndexableMessageTest { .extractor(textExtractor) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then String NO_TEXTUAL_BODY = "The textual body is not present"; @@ -273,7 +280,8 @@ class IndexableMessageTest { .extractor(textExtractor) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getMessageId()).isNull(); @@ -306,7 +314,8 @@ class IndexableMessageTest { .extractor(textExtractor) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getThreadId()).isNull(); @@ -336,7 +345,8 @@ class IndexableMessageTest { .extractor(textExtractor) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getMessageId()).isNull(); @@ -368,7 +378,8 @@ class IndexableMessageTest { .extractor(textExtractor) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.YES) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getThreadId()).isNull(); @@ -401,7 +412,8 @@ class IndexableMessageTest { .extractor(new DefaultTextExtractor()) .zoneId(ZoneId.of("Europe/Paris")) .indexAttachments(IndexAttachments.NO) - .build(); + .build() + .block(); // Then assertThat(indexableMessage.getThreadId()).isEqualTo("42"); diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java index e538721925..e9ac8a028e 100644 --- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java +++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java @@ -107,7 +107,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); spamMail.setUid(UID); spamMail.setModSeq(MOD_SEQ); - assertThatJson(messageToElasticSearchJson.convertToJson(spamMail)) + assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block()) .when(IGNORING_ARRAY_ORDER) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/spamMail.json")); } @@ -129,7 +129,7 @@ class MessageToElasticSearchJsonTest { mail.setUid(UID); mail.setModSeq(MOD_SEQ); - assertThatJson(messageToElasticSearchJson.convertToJson(mail)) + assertThatJson(messageToElasticSearchJson.convertToJson(mail).block()) .when(IGNORING_ARRAY_ORDER) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/inlined-mixed.json")); } @@ -151,7 +151,7 @@ class MessageToElasticSearchJsonTest { spamMail.setUid(UID); spamMail.setModSeq(MOD_SEQ); - String actual = messageToElasticSearchJson.convertToJson(spamMail); + String actual = messageToElasticSearchJson.convertToJson(spamMail).block(); assertThatJson(actual) .when(IGNORING_ARRAY_ORDER) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/invalidCharset.json")); @@ -173,7 +173,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); htmlMail.setModSeq(MOD_SEQ); htmlMail.setUid(UID); - assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail)) + assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail).block()) .when(IGNORING_ARRAY_ORDER) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/htmlMail.json")); } @@ -194,7 +194,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); pgpSignedMail.setModSeq(MOD_SEQ); pgpSignedMail.setUid(UID); - assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail)) + assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail).block()) .when(IGNORING_ARRAY_ORDER) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/pgpSignedMail.json")); } @@ -215,7 +215,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); mail.setModSeq(MOD_SEQ); mail.setUid(UID); - assertThatJson(messageToElasticSearchJson.convertToJson(mail)) + assertThatJson(messageToElasticSearchJson.convertToJson(mail).block()) .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/mail.json")); } @@ -236,7 +236,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); recursiveMail.setModSeq(MOD_SEQ); recursiveMail.setUid(UID); - assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail)) + assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail).block()) .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json")); } @@ -257,7 +257,7 @@ class MessageToElasticSearchJsonTest { MAILBOX_ID); mailWithNoInternalDate.setModSeq(MOD_SEQ); mailWithNoInternalDate.setUid(UID); - assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate)) + assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block()) .when(IGNORING_ARRAY_ORDER) .when(IGNORING_VALUES) .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json")); @@ -283,7 +283,7 @@ class MessageToElasticSearchJsonTest { new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES); - String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate); + String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block(); // Then assertThatJson(convertToJson) @@ -312,7 +312,7 @@ class MessageToElasticSearchJsonTest { new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.NO); - String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate); + String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block(); // Then assertThatJson(convertToJson) @@ -388,7 +388,7 @@ class MessageToElasticSearchJsonTest { spamMail.setUid(UID); spamMail.setModSeq(MOD_SEQ); - assertThatJson(messageToElasticSearchJson.convertToJson(spamMail)) + assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block()) .when(IGNORING_ARRAY_ORDER) .isEqualTo( ClassLoaderUtils.getSystemResourceAsString("eml/nonTextual.json", StandardCharsets.UTF_8)); @@ -414,7 +414,7 @@ class MessageToElasticSearchJsonTest { new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.NO); - String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message); + String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block(); // Then assertThatJson(convertToJsonWithoutAttachment) @@ -443,9 +443,7 @@ class MessageToElasticSearchJsonTest { new JsoupTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.NO); - String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message); - - System.out.println(convertToJsonWithoutAttachment); + String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block(); // Then assertThatJson(convertToJsonWithoutAttachment) diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java index 7a0112ed4f..f2afdd0793 100644 --- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java +++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import org.apache.james.mailbox.extractor.ParsedContent; import org.apache.james.mailbox.model.ContentType.MediaType; import org.apache.james.mailbox.model.ContentType.SubType; import org.junit.jupiter.api.Test; @@ -45,7 +46,9 @@ class MimePartTest { .addBodyContent(new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8))) .addMediaType(MediaType.of("text")) .addSubType(SubType.of("plain")) - .build(); + .build() + .asMimePart((in, contentType) -> ParsedContent.empty()) + .block(); assertThat(mimePart.getTextualBody()).contains(body); } diff --git a/mailbox/tika/pom.xml b/mailbox/tika/pom.xml index 0abe712a88..567bd92787 100644 --- a/mailbox/tika/pom.xml +++ b/mailbox/tika/pom.xml @@ -63,10 +63,18 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java index 30cd94f35b..562fa60cc8 100644 --- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java +++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java @@ -20,11 +20,10 @@ package org.apache.james.mailbox.tika; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.apache.james.mailbox.extractor.ParsedContent; @@ -34,17 +33,18 @@ import org.apache.james.metrics.api.GaugeRegistry; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Weigher; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.Weigher; import com.google.common.hash.Hashing; -import com.google.common.util.concurrent.UncheckedExecutionException; + +import reactor.core.publisher.Mono; public class CachingTextExtractor implements TextExtractor { private final TextExtractor underlying; - private final Cache<String, ParsedContent> cache; + private final AsyncCache<String, ParsedContent> cache; private final Metric weightMetric; public CachingTextExtractor(TextExtractor underlying, Duration cacheEvictionPeriod, Long cacheWeightInBytes, @@ -52,20 +52,22 @@ public class CachingTextExtractor implements TextExtractor { this.underlying = underlying; this.weightMetric = metricFactory.generate("textExtractor.cache.weight"); - Weigher<String, ParsedContent> weigher = - (key, parsedContent) -> computeWeight(parsedContent); RemovalListener<String, ParsedContent> removalListener = - notification -> Optional.ofNullable(notification.getValue()) + (key, value, removalCause) -> Optional.ofNullable(value) .map(this::computeWeight) .ifPresent(weightMetric::remove); - this.cache = CacheBuilder.newBuilder() - .expireAfterAccess(cacheEvictionPeriod.toMillis(), TimeUnit.MILLISECONDS) + Weigher<String, ParsedContent> weigher = + (key, parsedContent) -> computeWeight(parsedContent); + + cache = Caffeine.newBuilder() + .expireAfterAccess(Duration.ofMillis(cacheEvictionPeriod.toMillis())) .maximumWeight(cacheWeightInBytes) .weigher(weigher) + .evictionListener(removalListener) .recordStats() - .removalListener(removalListener) - .build(); + .buildAsync(); + recordStats(gaugeRegistry); } @@ -73,28 +75,28 @@ public class CachingTextExtractor implements TextExtractor { gaugeRegistry .register( "textExtractor.cache.hit.rate", - () -> cache.stats().hitRate()) + () -> cache.synchronous().stats().hitRate()) .register( "textExtractor.cache.hit.count", - () -> cache.stats().hitCount()); + () -> cache.synchronous().stats().hitCount()); gaugeRegistry.register( "textExtractor.cache.load.count", - () -> cache.stats().loadCount()) + () -> cache.synchronous().stats().loadCount()) .register( "textExtractor.cache.eviction.count", - () -> cache.stats().evictionCount()) + () -> cache.synchronous().stats().evictionCount()) .register( "textExtractor.cache.load.exception.rate", - () -> cache.stats().loadExceptionRate()) + () -> cache.synchronous().stats().loadFailureRate()) .register( "textExtractor.cache.load.miss.rate", - () -> cache.stats().missRate()) + () -> cache.synchronous().stats().missRate()) .register( "textExtractor.cache.load.miss.count", - () -> cache.stats().missCount()) + () -> cache.synchronous().stats().missCount()) .register( "textExtractor.cache.size", - cache::size); + cache.synchronous()::estimatedSize); } private int computeWeight(ParsedContent parsedContent) { @@ -109,21 +111,25 @@ public class CachingTextExtractor implements TextExtractor { } @Override - public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception { - byte[] bytes = IOUtils.toByteArray(inputStream); - String key = Hashing.sha256().hashBytes(bytes).toString(); - + public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) { try { - return cache.get(key, () -> retrieveAndUpdateWeight(bytes, contentType)); - } catch (ExecutionException | UncheckedExecutionException e) { - throw unwrap(e); + byte[] bytes = IOUtils.toByteArray(inputStream); + String key = Hashing.sha256().hashBytes(bytes).toString(); + + return Mono.fromFuture(cache.get(key, (a, b) -> retrieveAndUpdateWeight(bytes, contentType).toFuture())); + } catch (IOException e) { + throw new RuntimeException(e); } } - private ParsedContent retrieveAndUpdateWeight(byte[] bytes, ContentType contentType) throws Exception { - ParsedContent parsedContent = underlying.extractContent(new ByteArrayInputStream(bytes), contentType); - weightMetric.add(computeWeight(parsedContent)); - return parsedContent; + @Override + public ParsedContent extractContent(InputStream inputStream, ContentType contentType) { + return extractContentReactive(inputStream, contentType).block(); + } + + private Mono<ParsedContent> retrieveAndUpdateWeight(byte[] bytes, ContentType contentType) { + return underlying.extractContentReactive(new ByteArrayInputStream(bytes), contentType) + .doOnNext(next -> weightMetric.add(computeWeight(next))); } private Exception unwrap(Exception e) { @@ -135,6 +141,6 @@ public class CachingTextExtractor implements TextExtractor { @VisibleForTesting long size() { - return cache.size(); + return cache.synchronous().estimatedSize(); } } diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java index d846b75268..63cf46ee3f 100644 --- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java +++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java @@ -28,6 +28,8 @@ import org.apache.james.mailbox.model.ContentType.MimeType; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class ContentTypeFilteringTextExtractor implements TextExtractor { private final TextExtractor textExtractor; @@ -46,6 +48,14 @@ public class ContentTypeFilteringTextExtractor implements TextExtractor { return textExtractor.extractContent(inputStream, contentType); } + @Override + public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) { + if (isBlacklisted(contentType.mimeType())) { + return Mono.just(ParsedContent.empty()); + } + return textExtractor.extractContentReactive(inputStream, contentType); + } + private boolean isBlacklisted(MimeType contentType) { return contentTypeBlacklist.contains(contentType); } diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java index ceae8ffd33..434a109598 100644 --- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java +++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java @@ -19,11 +19,12 @@ package org.apache.james.mailbox.tika; import java.io.InputStream; -import java.util.Optional; import org.apache.james.mailbox.model.ContentType; +import reactor.core.publisher.Mono; + public interface TikaHttpClient { - Optional<InputStream> recursiveMetaDataAsJson(InputStream inputStream, ContentType contentType); + Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, ContentType contentType); } diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java index 5f00ca9efd..e7afed7fb7 100644 --- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java +++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java @@ -18,19 +18,24 @@ ****************************************************************/ package org.apache.james.mailbox.tika; -import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; -import java.util.Optional; +import java.time.Duration; -import org.apache.http.client.fluent.Request; import org.apache.http.client.utils.URIBuilder; import org.apache.http.entity.ContentType; +import org.apache.james.util.ReactorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpHeaderNames; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.netty.http.client.HttpClient; + public class TikaHttpClientImpl implements TikaHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClientImpl.class); @@ -38,10 +43,14 @@ public class TikaHttpClientImpl implements TikaHttpClient { private final TikaConfiguration tikaConfiguration; private final URI recursiveMetaData; + private final HttpClient httpClient; public TikaHttpClientImpl(TikaConfiguration tikaConfiguration) throws URISyntaxException { this.tikaConfiguration = tikaConfiguration; this.recursiveMetaData = buildURI(tikaConfiguration).resolve(RECURSIVE_METADATA_AS_TEXT_ENDPOINT); + + httpClient = HttpClient.create() + .responseTimeout(Duration.ofMillis(tikaConfiguration.getTimeoutInMillis())); } private URI buildURI(TikaConfiguration tikaConfiguration) throws URISyntaxException { @@ -53,23 +62,31 @@ public class TikaHttpClientImpl implements TikaHttpClient { } @Override - public Optional<InputStream> recursiveMetaDataAsJson(InputStream inputStream, org.apache.james.mailbox.model.ContentType contentType) { - try { - ContentType httpContentType = ContentType.create(contentType.mimeType().asString(), - contentType.charset() - .map(Charset::name) - .orElse(null)); - return Optional.ofNullable( - Request.Put(recursiveMetaData) - .socketTimeout(tikaConfiguration.getTimeoutInMillis()) - .bodyStream(inputStream, httpContentType) - .execute() - .returnContent() - .asStream()); - } catch (IOException e) { - LOGGER.warn("Failing to call Tika for content type {}", contentType, e); - return Optional.empty(); - } + public Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, org.apache.james.mailbox.model.ContentType contentType) { + ContentType httpContentType = ContentType.create(contentType.mimeType().asString(), + contentType.charset() + .map(Charset::name) + .orElse(null)); + + return httpClient + .headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, httpContentType.toString())) + .put() + .uri(recursiveMetaData) + .send(ReactorUtils.toChunks(inputStream, 16 * 1024) + .map(Unpooled::wrappedBuffer) + .subscribeOn(Schedulers.elastic())) + .responseSingle((resp, content) -> { + if (resp.status().code() == 200) { + return content.asInputStream(); + } else { + LOGGER.warn("Failing to call Tika for content type {} status {}", contentType, resp.status().code()); + return Mono.empty(); + } + }) + .onErrorResume(e -> { + LOGGER.warn("Failing to call Tika for content type {}", contentType, e); + return Mono.empty(); + }); } } diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java index b7fc39fa50..8c639096b7 100644 --- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java +++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java @@ -52,6 +52,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Mono; + public class TikaTextExtractor implements TextExtractor { private static final ContentType.MediaType TEXT = ContentType.MediaType.of("text"); @@ -77,24 +79,30 @@ public class TikaTextExtractor implements TextExtractor { } @Override - public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception { + public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) { if (contentType.mediaType().equals(TEXT)) { - return jsoupTextExtractor.extractContent(inputStream, contentType); + return jsoupTextExtractor.extractContentReactive(inputStream, contentType); } - return metricFactory.decorateSupplierWithTimerMetric("tikaTextExtraction", Throwing.supplier( - () -> performContentExtraction(inputStream, contentType)) - .sneakyThrow()); + return Mono.from(metricFactory.decoratePublisherWithTimerMetric("tikaTextExtraction", + performContentExtraction(inputStream, contentType))); + } + + @Override + public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception { + return extractContentReactive(inputStream, contentType) + .block(); } - public ParsedContent performContentExtraction(InputStream inputStream, ContentType contentType) throws IOException { - ContentAndMetadata contentAndMetadata = convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType)); - return new ParsedContent(contentAndMetadata.getContent(), contentAndMetadata.getMetadata()); + public Mono<ParsedContent> performContentExtraction(InputStream inputStream, ContentType contentType) { + Mono<ContentAndMetadata> contentAndMetadata = convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType)); + return contentAndMetadata + .map(result -> new ParsedContent(result.getContent(), result.getMetadata())); } - private ContentAndMetadata convert(Optional<InputStream> maybeInputStream) throws IOException { + private Mono<ContentAndMetadata> convert(Mono<InputStream> maybeInputStream) { return maybeInputStream .map(Throwing.function(inputStream -> objectMapper.readValue(inputStream, ContentAndMetadata.class))) - .orElse(ContentAndMetadata.empty()); + .switchIfEmpty(Mono.just(ContentAndMetadata.empty())); } @VisibleForTesting diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java index 6722733e87..54cea13a42 100644 --- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java +++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java @@ -19,7 +19,6 @@ package org.apache.james.mailbox.tika; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -54,7 +53,9 @@ import com.github.fge.lambdas.Throwing; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; -public class CachingTextExtractorTest { +import reactor.core.publisher.Mono; + +class CachingTextExtractorTest { private static final ParsedContent RESULT = new ParsedContent(Optional.of("content"), ImmutableMap.of()); public static final String BIG_STRING = Strings.repeat("0123456789", 103 * 1024); @@ -69,7 +70,7 @@ public class CachingTextExtractorTest { private TextExtractor wrappedTextExtractor; @BeforeEach - void setUp() throws Exception { + void setUp() { wrappedTextExtractor = mock(TextExtractor.class); textExtractor = new CachingTextExtractor(wrappedTextExtractor, TikaConfiguration.DEFAULT_CACHE_EVICTION_PERIOD, @@ -77,15 +78,15 @@ public class CachingTextExtractorTest { new RecordingMetricFactory(), new NoopGaugeRegistry()); - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(RESULT); + when(wrappedTextExtractor.extractContentReactive(any(), any())) + .thenReturn(Mono.just(RESULT)); } @Test void extractContentShouldCallUnderlyingTextExtractor() throws Exception { textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE); - verify(wrappedTextExtractor, times(1)).extractContent(any(), any()); + verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any()); verifyNoMoreInteractions(wrappedTextExtractor); } @@ -94,79 +95,30 @@ public class CachingTextExtractorTest { textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE); textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE); - verify(wrappedTextExtractor, times(1)).extractContent(any(), any()); + verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any()); verifyNoMoreInteractions(wrappedTextExtractor); } @Test - void extractContentShouldPropagateCheckedException() throws Exception { + void extractContentShouldPropagateCheckedException() { IOException ioException = new IOException("Any"); - when(wrappedTextExtractor.extractContent(any(), any())) - .thenThrow(ioException); + when(wrappedTextExtractor.extractContentReactive(any(), any())) + .thenReturn(Mono.error(ioException)); assertThatThrownBy(() -> textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE)) - .isEqualTo(ioException); + .hasCause(ioException); } @Test - void extractContentShouldPropagateRuntimeException() throws Exception { + void extractContentShouldPropagateRuntimeException() { RuntimeException runtimeException = new RuntimeException("Any"); - when(wrappedTextExtractor.extractContent(any(), any())) - .thenThrow(runtimeException); + when(wrappedTextExtractor.extractContentReactive(any(), any())) + .thenReturn(Mono.error(runtimeException)); assertThatThrownBy(() -> textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE)) .isEqualTo(runtimeException); } - @Test - void cacheShouldEvictEntriesWhenFull() throws Exception { - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(_2MiB_RESULT); - - IntStream.range(0, 10) - .mapToObj(STREAM_GENERATOR::apply) - .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE))); - - assertThat(textExtractor.size()) - .isLessThanOrEqualTo(5); - } - - @Test - void olderEntriesShouldBeEvictedFirst() throws Exception { - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(_2MiB_RESULT); - - IntStream.range(0, 10) - .mapToObj(STREAM_GENERATOR::apply) - .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE))); - - reset(wrappedTextExtractor); - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(_2MiB_RESULT); - - textExtractor.extractContent(STREAM_GENERATOR.apply(1), CONTENT_TYPE); - - verify(wrappedTextExtractor).extractContent(any(), any()); - } - - @Test - void youngerEntriesShouldBePreservedByEviction() throws Exception { - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(_2MiB_RESULT); - - IntStream.range(0, 10) - .mapToObj(STREAM_GENERATOR::apply) - .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE))); - - reset(wrappedTextExtractor); - when(wrappedTextExtractor.extractContent(any(), any())) - .thenReturn(_2MiB_RESULT); - - textExtractor.extractContent(STREAM_GENERATOR.apply(9), CONTENT_TYPE); - - verifyZeroInteractions(wrappedTextExtractor); - } - @Test void frequentlyAccessedEntriesShouldBePreservedByEviction() throws Exception { when(wrappedTextExtractor.extractContent(any(), any())) @@ -191,7 +143,7 @@ public class CachingTextExtractorTest { .threadCount(10) .runSuccessfullyWithin(Duration.ofMinutes(1)); - verify(wrappedTextExtractor, times(1)).extractContent(any(), any()); + verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any()); } } \ No newline at end of file diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java index e47cee2cee..6c0f76ac0f 100644 --- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java +++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java @@ -26,7 +26,6 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.james.mailbox.extractor.ParsedContent; @@ -42,6 +41,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.TextNode; +import reactor.core.publisher.Mono; + class TikaTextExtractorTest { TextExtractor textExtractor; @@ -170,7 +171,7 @@ class TikaTextExtractorTest { void deserializerShouldNotThrowWhenMoreThanOneNode() throws Exception { TikaTextExtractor textExtractor = new TikaTextExtractor( new RecordingMetricFactory(), - (inputStream, contentType) -> Optional.of(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice document !\"}, " + + (inputStream, contentType) -> Mono.just(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice document !\"}, " + "{\"Chroma BlackIsZero\": \"true\"}]") .getBytes(StandardCharsets.UTF_8)))); @@ -183,7 +184,7 @@ class TikaTextExtractorTest { String expectedExtractedContent = "content A"; TikaTextExtractor textExtractor = new TikaTextExtractor( new RecordingMetricFactory(), - (inputStream, contentType) -> Optional.of(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + "\"}, " + + (inputStream, contentType) -> Mono.just(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + "\"}, " + "{\"X-TIKA:content\": \"content B\"}]") .getBytes(StandardCharsets.UTF_8)))); @@ -198,7 +199,7 @@ class TikaTextExtractorTest { void deserializerShouldThrowWhenNodeIsNotAnObject() { TikaTextExtractor textExtractor = new TikaTextExtractor( new RecordingMetricFactory(), - (inputStream, contentType) -> Optional.of(new ByteArrayInputStream("[\"value1\"]" + (inputStream, contentType) -> Mono.just(new ByteArrayInputStream("[\"value1\"]" .getBytes(StandardCharsets.UTF_8)))); InputStream inputStream = new ByteArrayInputStream("toto".getBytes(StandardCharsets.UTF_8)); diff --git a/pom.xml b/pom.xml index dce21c398c..0817a60cf4 100644 --- a/pom.xml +++ b/pom.xml @@ -2092,6 +2092,11 @@ <artifactId>jackson-datatype-jsr310</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>3.0.5</version> + </dependency> <dependency> <groupId>com.github.dpaukov</groupId> <artifactId>combinatoricslib3</artifactId> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
