This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 8e8bf0bf28 JAMES-3763 BlobStore backed MailRepository
8e8bf0bf28 is described below
commit 8e8bf0bf28facb7b2b2643629c63fb2fae143309
Author: Matthieu Baechler <[email protected]>
AuthorDate: Tue May 10 14:21:57 2022 +0200
JAMES-3763 BlobStore backed MailRepository
Co-authored-by: Jean Helou <[email protected]>
---
server/mailrepository/mailrepository-blob/pom.xml | 103 +++++++++++
.../mailrepository/blob/BlobMailRepository.scala | 205 +++++++++++++++++++++
.../james/mailrepository/blob/MailMetadata.scala | 96 ++++++++++
.../blob/BlobMailRepositoryTest.java | 60 ++++++
server/pom.xml | 1 +
5 files changed, 465 insertions(+)
diff --git a/server/mailrepository/mailrepository-blob/pom.xml
b/server/mailrepository/mailrepository-blob/pom.xml
new file mode 100644
index 0000000000..42610a8183
--- /dev/null
+++ b/server/mailrepository/mailrepository-blob/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-server</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>james-server-mailrepository-blob</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache James :: Server :: MailRepository :: Blob</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-mailet-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-memory</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-storage-strategy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-lifecycle-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-mail-store</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-mailrepository-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-mailrepository-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>testing-base</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.play</groupId>
+ <artifactId>play-json_${scala.base}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-configuration2</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
new file mode 100644
index 0000000000..ef4ba6373e
--- /dev/null
+++
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
@@ -0,0 +1,205 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ************************************************************** */
+
+package org.apache.james.mailrepository.blob
+
+import java.{lang, util}
+import java.util.Date
+import java.util.stream.Stream
+
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
+
+import scala.jdk.CollectionConverters.IterableHasAsJava
+
+import org.apache.commons.lang3.tuple.Pair
+import org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED
+import org.apache.james.blob.api.Store.Impl
+import org.apache.james.blob.api.{BlobId, BlobPartsId, BlobStore,
BlobStoreDAO, BlobType, BucketName, Store}
+import org.apache.james.blob.mail.MimeMessagePartsId
+import org.apache.james.mailrepository.api.{MailKey, MailRepository}
+import org.apache.mailet.{Attribute, AttributeName, AttributeValue, Mail,
PerRecipientHeaders}
+
+import com.google.common.collect.ImmutableMap
+
+import play.api.libs.json.{Format, Json}
+import reactor.core.publisher.{Flux, Mono}
+import scala.jdk.StreamConverters._
+
+import org.apache.james.core.{MailAddress, MaybeSender}
+import org.apache.james.server.blob.deduplication.BlobStoreFactory
+import org.apache.james.server.core.MailImpl
+
+
+private[blob] object serializers {
+ implicit val headerFormat: Format[Header] = Json.format[Header]
+ implicit val mailMetadataFormat: Format[MailMetadata] =
Json.format[MailMetadata]
+}
+
+object BlobMailRepository {
+ private[blob] object MailPartsId {
+ private[blob] val METADATA_BLOB_TYPE = new BlobType("mailMetadata",
SIZE_BASED)
+
+ class Factory extends BlobPartsId.Factory[BlobMailRepository.MailPartsId] {
+ override def generate(map: util.Map[BlobType, BlobId]) = {
+ require(map.containsKey(METADATA_BLOB_TYPE), "Expecting 'mailMetadata'
blobId to be specified")
+ require(map.size == 1, "blobId other than 'mailMetadata' are not
supported")
+ new BlobMailRepository.MailPartsId(map.get(METADATA_BLOB_TYPE))
+ }
+ }
+ }
+
+ private[blob] case class MailPartsId private[blob](metadataBlobId: BlobId)
extends BlobPartsId {
+ override def asMap = ImmutableMap.of(MailPartsId.METADATA_BLOB_TYPE,
metadataBlobId)
+
+ def toMailKey: MailKey = new MailKey(metadataBlobId.asString())
+ }
+
+ private[blob] class MailEncoder private[blob](blobStoreDAO: BlobStoreDAO,
blobIdFactory: BlobId.Factory)
+ extends Impl.Encoder[(Mail, MimeMessagePartsId)] {
+
+ import serializers._
+
+ override def encode(mailAndPartsId: (Mail, MimeMessagePartsId)):
Stream[Pair[BlobType, Impl.ValueToSave]] = {
+ val (mail, partsIds) = mailAndPartsId
+ val mailMetadata = MailMetadata.of(mail, partsIds)
+ val payload = Json.stringify(Json.toJson(mailMetadata))
+ val mailKey = MailKey.forMail(mail)
+ val blobId=blobIdFactory.from(mailKey.asString())
+ val save: Impl.ValueToSave = (bucketName, _) =>
+ Mono.from(blobStoreDAO.save(bucketName, blobId,
payload)).`then`(Mono.just(blobId))
+
+
+ LazyList(Pair.of(MailPartsId.METADATA_BLOB_TYPE, save)).asJavaSeqStream
+ }
+ }
+
+ private[blob] class MailDecoder private[blob](blobIdFactory: BlobId.Factory)
+ extends Impl.Decoder[(Mail, MimeMessagePartsId)] {
+
+ private def readMail(mailMetadata: MailMetadata): Mail = {
+ val builder = MailImpl.builder
+ .name(mailMetadata.name)
+
.sender(mailMetadata.sender.map(MaybeSender.getMailSender).getOrElse(MaybeSender.nullSender))
+ .addRecipients(mailMetadata.recipients.map(new
MailAddress(_)).asJavaCollection)
+ .remoteAddr(mailMetadata.remoteAddr)
+ .remoteHost(mailMetadata.remoteHost)
+
+ mailMetadata.state.foreach(builder.state)
+ mailMetadata.errorMessage.foreach(builder.errorMessage)
+
+ mailMetadata.lastUpdated.map(Date.from).foreach(builder.lastUpdated)
+
+ mailMetadata.attributes.foreach { case (name, value) =>
builder.addAttribute(new Attribute(AttributeName.of(name),
AttributeValue.fromJsonString(value))) }
+
+
builder.addAllHeadersForRecipients(retrievePerRecipientHeaders(mailMetadata.perRecipientHeaders))
+
+ builder.build
+ }
+
+
+ private def retrievePerRecipientHeaders(perRecipientHeaders: Map[String,
Iterable[Header]]): PerRecipientHeaders = {
+ val result = new PerRecipientHeaders()
+ perRecipientHeaders.foreach { case (key, value) =>
+ value.foreach(headers => {
+ headers.values.foreach(header => {
+ val builder =
PerRecipientHeaders.Header.builder().name(headers.key).value(header)
+ result.addHeaderForRecipient(builder, new MailAddress(key))
+ })
+ })
+ }
+ result
+ }
+
+
+ import serializers._
+
+ override def decode(streams: util.Map[BlobType,
Store.CloseableByteSource]): (Mail, MimeMessagePartsId) = {
+ val source = streams.get(MailPartsId.METADATA_BLOB_TYPE)
+ val value =
Json.fromJson[MailMetadata](Json.parse(source.openStream())).get
+ (readMail(value), value.mimePartsId(blobIdFactory))
+ }
+ }
+}
+
+class BlobMailRepository(val blobStore: BlobStoreDAO,
+ val blobIdFactory: BlobId.Factory,
+ val mimeMessageStore: Store[MimeMessage,
MimeMessagePartsId]
+ ) extends MailRepository {
+ private val bucketName: BucketName = BucketName.of("mailMetadata")
+ private val mimeMessageBucketName: BucketName =
BucketName.of("mimeMessageData")
+ import BlobMailRepository._
+
+ @throws[MessagingException]
+ override def store(mc: Mail): MailKey = {
+ mimeMessageStore.save(mc.getMessage)
+ .flatMap(mimePartsId => mailMetadataStore.save((mc, mimePartsId)))
+ .map(mailPartsId => mailPartsId.toMailKey)
+ .block()
+ }
+
+ @throws[MessagingException]
+ override def size: Long =
Flux.from(blobStore.listBlobs(bucketName)).count().block()
+
+ @throws[MessagingException]
+ override def list: util.Iterator[MailKey] =
Flux.from(blobStore.listBlobs(bucketName))
+ .map[MailKey](blobId => new MailKey(blobId.asString))
+ .toIterable
+ .iterator
+
+ @throws[MessagingException]
+ override def retrieve(key: MailKey): Mail = {
+ mailMetadataStore.read(MailPartsId(blobIdFactory.from(key.asString())))
+ .flatMap {
+ case (mail, mimeMessagePartsId) =>
mimeMessageStore.read(mimeMessagePartsId).map { mimeMessage =>
+ mail.setMessage(mimeMessage)
+ Some(mail): Option[Mail]
+ }
+ }
+ .onErrorReturn(None)
+ .block()
+ .orNull
+ }
+
+ @throws[MessagingException]
+ override def remove(key: MailKey) = {
+ Mono.from(blobStore.delete(bucketName,
blobIdFactory.from(key.asString()))).block()
+ }
+
+ @throws[MessagingException]
+ override def removeAll() = {
+ Flux.from(blobStore.listBlobs(bucketName))
+ .flatMap(blobId => blobStore.delete(bucketName, blobId))
+ .blockLast()
+ }
+
+ private val mailMetaDataBlobStore: BlobStore = BlobStoreFactory.builder()
+ .blobStoreDAO(blobStore)
+ .blobIdFactory(blobIdFactory)
+ .bucket(bucketName)
+ .passthrough()
+
+ val mailMetadataStore = new Store.Impl[(Mail, MimeMessagePartsId),
BlobMailRepository.MailPartsId](
+ new MailPartsId.Factory,
+ new BlobMailRepository.MailEncoder(blobStore, blobIdFactory),
+ new BlobMailRepository.MailDecoder(blobIdFactory),
+ mailMetaDataBlobStore,
+ bucketName
+ )
+}
\ No newline at end of file
diff --git
a/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
new file mode 100644
index 0000000000..92df76ef0b
--- /dev/null
+++
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
@@ -0,0 +1,96 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ************************************************************** */
+package org.apache.james.mailrepository.blob
+
+import java.time.Instant
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.StreamConverters._
+import scala.jdk.OptionConverters._
+
+import org.apache.james.blob.api.BlobId
+import org.apache.james.blob.mail.MimeMessagePartsId
+import org.apache.mailet.Mail
+
+private[blob] object Header {
+ def of: ((String, Iterable[String])) => Header = (this.apply _).tupled
+}
+
+private[blob] case class Header(key: String, values: Iterable[String])
+
+private[blob] object MailMetadata {
+ def of(mail: Mail, partsId: MimeMessagePartsId): MailMetadata = {
+ MailMetadata(
+
Option(mail.getRecipients).map(_.asScala.map(_.asString).toSeq).getOrElse(Seq.empty),
+ mail.getName,
+ mail.getMaybeSender.asOptional().map(_.asString()).toScala,
+ Option(mail.getState),
+ Option(mail.getErrorMessage),
+ Option(mail.getLastUpdated).map(_.toInstant),
+ serializedAttributes(mail),
+ mail.getRemoteAddr,
+ mail.getRemoteHost,
+ fromPerRecipientHeaders(mail),
+ partsId.getHeaderBlobId.asString(),
+ partsId.getBodyBlobId.asString()
+ )
+ }
+
+ private def serializedAttributes(mail: Mail): Map[String, String] =
+ mail.attributes().toScala(LazyList)
+ .map(attribute => attribute.getName.asString() ->
attribute.getValue.toJson.toString)
+ .toMap
+
+ private def fromPerRecipientHeaders(mail: Mail): Map[String,
Iterable[Header]] = {
+ mail.getPerRecipientSpecificHeaders
+ .getHeadersByRecipient
+ .asMap
+ .asScala
+ .view
+ .map { case (mailAddress, headers) =>
+ mailAddress.asString() -> headers
+ .asScala
+ .groupMap(_.getName)(_.getValue)
+ .map(Header.of)
+ }.toMap
+
+ }
+}
+
+private[blob] case class MailMetadata(
+ recipients: Seq[String],
+ name: String,
+ sender: Option[String],
+ state: Option[String],
+ errorMessage: Option[String],
+ lastUpdated: Option[Instant],
+ attributes: Map[String, String],
+ remoteAddr: String,
+ remoteHost: String,
+ perRecipientHeaders: Map[String, Iterable[Header]],
+ headerBlobId: String,
+ bodyBlobId: String){
+
+ def mimePartsId(implicit blobIdFactory: BlobId.Factory): MimeMessagePartsId =
+ MimeMessagePartsId.builder()
+ .headerBlobId(blobIdFactory.from(headerBlobId))
+ .bodyBlobId(blobIdFactory.from(bodyBlobId))
+ .build()
+
+}
\ No newline at end of file
diff --git
a/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
b/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
new file mode 100644
index 0000000000..40b31aeca2
--- /dev/null
+++
b/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
@@ -0,0 +1,60 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.blob;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.blob.memory.MemoryBlobStoreFactory;
+import org.apache.james.mailrepository.MailRepositoryContract;
+import org.apache.james.mailrepository.api.MailRepository;
+import org.junit.jupiter.api.BeforeEach;
+
+class BlobMailRepositoryTest implements MailRepositoryContract {
+
+ private BlobMailRepository blobMailRepository;
+
+ @BeforeEach
+ void setup() {
+ var blobIdFactory = new HashBlobId.Factory();
+ var blobStore = new MemoryBlobStoreDAO();
+ var mimeMessageBlobStore = MemoryBlobStoreFactory.builder()
+ .blobIdFactory(blobIdFactory)
+ .defaultBucketName()
+ .passthrough();
+ MimeMessageStore.Factory mimeMessageStoreFactory = new
MimeMessageStore.Factory(mimeMessageBlobStore);
+ Store<MimeMessage, MimeMessagePartsId> mimeMessageStore =
mimeMessageStoreFactory.mimeMessageStore();
+ blobMailRepository = new BlobMailRepository(
+ blobStore,
+ blobIdFactory,
+ mimeMessageStore
+ );
+ }
+
+ @Override
+ public MailRepository retrieveRepository() {
+ return blobMailRepository;
+ }
+}
\ No newline at end of file
diff --git a/server/pom.xml b/server/pom.xml
index ff0c637b50..04cc1f2614 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -90,6 +90,7 @@
<module>mailet/remote-delivery-integration-testing</module>
<module>mailrepository/mailrepository-api</module>
+ <module>mailrepository/mailrepository-blob</module>
<module>mailrepository/mailrepository-cassandra</module>
<module>mailrepository/mailrepository-memory</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]