This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e8bf0bf28 JAMES-3763 BlobStore backed MailRepository
8e8bf0bf28 is described below

commit 8e8bf0bf28facb7b2b2643629c63fb2fae143309
Author: Matthieu Baechler <[email protected]>
AuthorDate: Tue May 10 14:21:57 2022 +0200

    JAMES-3763 BlobStore backed MailRepository
    
    Co-authored-by: Jean Helou <[email protected]>
---
 server/mailrepository/mailrepository-blob/pom.xml  | 103 +++++++++++
 .../mailrepository/blob/BlobMailRepository.scala   | 205 +++++++++++++++++++++
 .../james/mailrepository/blob/MailMetadata.scala   |  96 ++++++++++
 .../blob/BlobMailRepositoryTest.java               |  60 ++++++
 server/pom.xml                                     |   1 +
 5 files changed, 465 insertions(+)

diff --git a/server/mailrepository/mailrepository-blob/pom.xml 
b/server/mailrepository/mailrepository-blob/pom.xml
new file mode 100644
index 0000000000..42610a8183
--- /dev/null
+++ b/server/mailrepository/mailrepository-blob/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.james</groupId>
+        <artifactId>james-server</artifactId>
+        <version>3.8.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>james-server-mailrepository-blob</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Apache James :: Server :: MailRepository :: Blob</name>
+
+    <dependencies>
+        <dependency>
+          <groupId>${james.groupId}</groupId>
+          <artifactId>apache-mailet-test</artifactId>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-lifecycle-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-mail-store</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-mailrepository-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-mailrepository-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>testing-base</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.play</groupId>
+            <artifactId>play-json_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-configuration2</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
 
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
new file mode 100644
index 0000000000..ef4ba6373e
--- /dev/null
+++ 
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepository.scala
@@ -0,0 +1,205 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ************************************************************** */
+
+package org.apache.james.mailrepository.blob
+
+import java.{lang, util}
+import java.util.Date
+import java.util.stream.Stream
+
+import javax.mail.MessagingException
+import javax.mail.internet.MimeMessage
+
+import scala.jdk.CollectionConverters.IterableHasAsJava
+
+import org.apache.commons.lang3.tuple.Pair
+import org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED
+import org.apache.james.blob.api.Store.Impl
+import org.apache.james.blob.api.{BlobId, BlobPartsId, BlobStore, 
BlobStoreDAO, BlobType, BucketName, Store}
+import org.apache.james.blob.mail.MimeMessagePartsId
+import org.apache.james.mailrepository.api.{MailKey, MailRepository}
+import org.apache.mailet.{Attribute, AttributeName, AttributeValue, Mail, 
PerRecipientHeaders}
+
+import com.google.common.collect.ImmutableMap
+
+import play.api.libs.json.{Format, Json}
+import reactor.core.publisher.{Flux, Mono}
+import scala.jdk.StreamConverters._
+
+import org.apache.james.core.{MailAddress, MaybeSender}
+import org.apache.james.server.blob.deduplication.BlobStoreFactory
+import org.apache.james.server.core.MailImpl
+
+
+private[blob] object serializers {
+  implicit val headerFormat: Format[Header] = Json.format[Header]
+  implicit val mailMetadataFormat: Format[MailMetadata] = 
Json.format[MailMetadata]
+}
+
+object BlobMailRepository {
+  private[blob] object MailPartsId {
+    private[blob] val METADATA_BLOB_TYPE = new BlobType("mailMetadata", 
SIZE_BASED)
+
+    class Factory extends BlobPartsId.Factory[BlobMailRepository.MailPartsId] {
+      override def generate(map: util.Map[BlobType, BlobId]) = {
+        require(map.containsKey(METADATA_BLOB_TYPE), "Expecting 'mailMetadata' 
blobId to be specified")
+        require(map.size == 1, "blobId other than 'mailMetadata' are not 
supported")
+        new BlobMailRepository.MailPartsId(map.get(METADATA_BLOB_TYPE))
+      }
+    }
+  }
+
+  private[blob] case class MailPartsId private[blob](metadataBlobId: BlobId) 
extends BlobPartsId {
+    override def asMap = ImmutableMap.of(MailPartsId.METADATA_BLOB_TYPE, 
metadataBlobId)
+
+    def toMailKey: MailKey = new MailKey(metadataBlobId.asString())
+  }
+
+  private[blob] class MailEncoder private[blob](blobStoreDAO: BlobStoreDAO, 
blobIdFactory: BlobId.Factory)
+    extends Impl.Encoder[(Mail, MimeMessagePartsId)] {
+
+    import serializers._
+
+    override def encode(mailAndPartsId: (Mail, MimeMessagePartsId)): 
Stream[Pair[BlobType, Impl.ValueToSave]] = {
+      val (mail, partsIds) = mailAndPartsId
+      val mailMetadata = MailMetadata.of(mail, partsIds)
+      val payload = Json.stringify(Json.toJson(mailMetadata))
+      val mailKey = MailKey.forMail(mail)
+      val blobId=blobIdFactory.from(mailKey.asString())
+      val save: Impl.ValueToSave = (bucketName, _) =>
+        Mono.from(blobStoreDAO.save(bucketName, blobId, 
payload)).`then`(Mono.just(blobId))
+
+
+      LazyList(Pair.of(MailPartsId.METADATA_BLOB_TYPE, save)).asJavaSeqStream
+    }
+  }
+
+  private[blob] class MailDecoder private[blob](blobIdFactory: BlobId.Factory)
+    extends Impl.Decoder[(Mail, MimeMessagePartsId)] {
+
+    private def readMail(mailMetadata: MailMetadata): Mail = {
+      val builder = MailImpl.builder
+        .name(mailMetadata.name)
+        
.sender(mailMetadata.sender.map(MaybeSender.getMailSender).getOrElse(MaybeSender.nullSender))
+        .addRecipients(mailMetadata.recipients.map(new 
MailAddress(_)).asJavaCollection)
+        .remoteAddr(mailMetadata.remoteAddr)
+        .remoteHost(mailMetadata.remoteHost)
+
+      mailMetadata.state.foreach(builder.state)
+      mailMetadata.errorMessage.foreach(builder.errorMessage)
+
+      mailMetadata.lastUpdated.map(Date.from).foreach(builder.lastUpdated)
+
+      mailMetadata.attributes.foreach { case (name, value) => 
builder.addAttribute(new Attribute(AttributeName.of(name), 
AttributeValue.fromJsonString(value))) }
+
+      
builder.addAllHeadersForRecipients(retrievePerRecipientHeaders(mailMetadata.perRecipientHeaders))
+
+      builder.build
+    }
+
+
+    private def retrievePerRecipientHeaders(perRecipientHeaders: Map[String, 
Iterable[Header]]): PerRecipientHeaders = {
+      val result = new PerRecipientHeaders()
+      perRecipientHeaders.foreach { case (key, value) =>
+        value.foreach(headers => {
+          headers.values.foreach(header => {
+            val builder = 
PerRecipientHeaders.Header.builder().name(headers.key).value(header)
+            result.addHeaderForRecipient(builder, new MailAddress(key))
+          })
+        })
+      }
+      result
+    }
+
+
+    import serializers._
+
+    override def decode(streams: util.Map[BlobType, 
Store.CloseableByteSource]): (Mail, MimeMessagePartsId) = {
+      val source = streams.get(MailPartsId.METADATA_BLOB_TYPE)
+      val value = 
Json.fromJson[MailMetadata](Json.parse(source.openStream())).get
+      (readMail(value), value.mimePartsId(blobIdFactory))
+    }
+  }
+}
+
+class BlobMailRepository(val blobStore: BlobStoreDAO,
+                         val blobIdFactory: BlobId.Factory,
+                         val mimeMessageStore: Store[MimeMessage, 
MimeMessagePartsId]
+                        ) extends MailRepository {
+  private val bucketName: BucketName = BucketName.of("mailMetadata")
+  private val mimeMessageBucketName: BucketName = 
BucketName.of("mimeMessageData")
+  import BlobMailRepository._
+
+  @throws[MessagingException]
+  override def store(mc: Mail): MailKey = {
+    mimeMessageStore.save(mc.getMessage)
+      .flatMap(mimePartsId => mailMetadataStore.save((mc, mimePartsId)))
+      .map(mailPartsId => mailPartsId.toMailKey)
+      .block()
+  }
+
+  @throws[MessagingException]
+  override def size: Long = 
Flux.from(blobStore.listBlobs(bucketName)).count().block()
+
+  @throws[MessagingException]
+  override def list: util.Iterator[MailKey] = 
Flux.from(blobStore.listBlobs(bucketName))
+    .map[MailKey](blobId => new MailKey(blobId.asString))
+    .toIterable
+    .iterator
+
+  @throws[MessagingException]
+  override def retrieve(key: MailKey): Mail = {
+    mailMetadataStore.read(MailPartsId(blobIdFactory.from(key.asString())))
+      .flatMap {
+        case (mail, mimeMessagePartsId) => 
mimeMessageStore.read(mimeMessagePartsId).map { mimeMessage =>
+          mail.setMessage(mimeMessage)
+          Some(mail): Option[Mail]
+        }
+      }
+      .onErrorReturn(None)
+      .block()
+      .orNull
+  }
+
+  @throws[MessagingException]
+  override def remove(key: MailKey) = {
+    Mono.from(blobStore.delete(bucketName, 
blobIdFactory.from(key.asString()))).block()
+  }
+
+  @throws[MessagingException]
+  override def removeAll() = {
+    Flux.from(blobStore.listBlobs(bucketName))
+      .flatMap(blobId => blobStore.delete(bucketName, blobId))
+      .blockLast()
+  }
+
+  private val mailMetaDataBlobStore: BlobStore = BlobStoreFactory.builder()
+    .blobStoreDAO(blobStore)
+    .blobIdFactory(blobIdFactory)
+    .bucket(bucketName)
+    .passthrough()
+
+  val mailMetadataStore = new Store.Impl[(Mail, MimeMessagePartsId), 
BlobMailRepository.MailPartsId](
+    new MailPartsId.Factory,
+    new BlobMailRepository.MailEncoder(blobStore, blobIdFactory),
+    new BlobMailRepository.MailDecoder(blobIdFactory),
+    mailMetaDataBlobStore,
+    bucketName
+  )
+}
\ No newline at end of file
diff --git 
a/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
 
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
new file mode 100644
index 0000000000..92df76ef0b
--- /dev/null
+++ 
b/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/MailMetadata.scala
@@ -0,0 +1,96 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ************************************************************** */
+package org.apache.james.mailrepository.blob
+
+import java.time.Instant
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.StreamConverters._
+import scala.jdk.OptionConverters._
+
+import org.apache.james.blob.api.BlobId
+import org.apache.james.blob.mail.MimeMessagePartsId
+import org.apache.mailet.Mail
+
+private[blob] object Header {
+  def of: ((String, Iterable[String])) => Header = (this.apply _).tupled
+}
+
+private[blob] case class Header(key: String, values: Iterable[String])
+
+private[blob] object MailMetadata {
+  def of(mail: Mail, partsId: MimeMessagePartsId): MailMetadata = {
+    MailMetadata(
+      
Option(mail.getRecipients).map(_.asScala.map(_.asString).toSeq).getOrElse(Seq.empty),
+      mail.getName,
+      mail.getMaybeSender.asOptional().map(_.asString()).toScala,
+      Option(mail.getState),
+      Option(mail.getErrorMessage),
+      Option(mail.getLastUpdated).map(_.toInstant),
+      serializedAttributes(mail),
+      mail.getRemoteAddr,
+      mail.getRemoteHost,
+      fromPerRecipientHeaders(mail),
+      partsId.getHeaderBlobId.asString(),
+      partsId.getBodyBlobId.asString()
+    )
+  }
+
+  private def serializedAttributes(mail: Mail): Map[String, String] =
+    mail.attributes().toScala(LazyList)
+      .map(attribute => attribute.getName.asString() -> 
attribute.getValue.toJson.toString)
+      .toMap
+
+  private def fromPerRecipientHeaders(mail: Mail): Map[String, 
Iterable[Header]] = {
+    mail.getPerRecipientSpecificHeaders
+      .getHeadersByRecipient
+      .asMap
+      .asScala
+      .view
+      .map { case (mailAddress, headers) =>
+        mailAddress.asString() -> headers
+          .asScala
+          .groupMap(_.getName)(_.getValue)
+          .map(Header.of)
+      }.toMap
+
+  }
+}
+
+private[blob] case class MailMetadata(
+                        recipients: Seq[String],
+                        name: String,
+                        sender: Option[String],
+                        state: Option[String],
+                        errorMessage: Option[String],
+                        lastUpdated: Option[Instant],
+                        attributes: Map[String, String],
+                        remoteAddr: String,
+                        remoteHost: String,
+                        perRecipientHeaders: Map[String, Iterable[Header]],
+                        headerBlobId: String,
+                        bodyBlobId: String){
+
+  def mimePartsId(implicit blobIdFactory: BlobId.Factory): MimeMessagePartsId =
+    MimeMessagePartsId.builder()
+      .headerBlobId(blobIdFactory.from(headerBlobId))
+      .bodyBlobId(blobIdFactory.from(bodyBlobId))
+      .build()
+
+}
\ No newline at end of file
diff --git 
a/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
 
b/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
new file mode 100644
index 0000000000..40b31aeca2
--- /dev/null
+++ 
b/server/mailrepository/mailrepository-blob/src/test/java/org/apache/james/mailrepository/blob/BlobMailRepositoryTest.java
@@ -0,0 +1,60 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.blob;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.blob.memory.MemoryBlobStoreFactory;
+import org.apache.james.mailrepository.MailRepositoryContract;
+import org.apache.james.mailrepository.api.MailRepository;
+import org.junit.jupiter.api.BeforeEach;
+
+class BlobMailRepositoryTest implements MailRepositoryContract {
+
+    private BlobMailRepository blobMailRepository;
+
+    @BeforeEach
+    void setup() {
+        var blobIdFactory = new HashBlobId.Factory();
+        var blobStore = new MemoryBlobStoreDAO();
+        var mimeMessageBlobStore  = MemoryBlobStoreFactory.builder()
+                .blobIdFactory(blobIdFactory)
+                .defaultBucketName()
+                .passthrough();
+        MimeMessageStore.Factory mimeMessageStoreFactory = new 
MimeMessageStore.Factory(mimeMessageBlobStore);
+        Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = 
mimeMessageStoreFactory.mimeMessageStore();
+        blobMailRepository = new BlobMailRepository(
+                blobStore,
+                blobIdFactory,
+                mimeMessageStore
+        );
+    }
+
+    @Override
+    public MailRepository retrieveRepository() {
+        return blobMailRepository;
+    }
+}
\ No newline at end of file
diff --git a/server/pom.xml b/server/pom.xml
index ff0c637b50..04cc1f2614 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -90,6 +90,7 @@
         <module>mailet/remote-delivery-integration-testing</module>
 
         <module>mailrepository/mailrepository-api</module>
+        <module>mailrepository/mailrepository-blob</module>
         <module>mailrepository/mailrepository-cassandra</module>
         <module>mailrepository/mailrepository-memory</module>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to