JAMES-2291 Implement CassandraMailRepository

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d281a05a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d281a05a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d281a05a

Branch: refs/heads/master
Commit: d281a05a6ab8960054d799a815292fd13d83e482
Parents: 0806d78
Author: benwa <[email protected]>
Authored: Wed Jan 24 13:34:42 2018 +0700
Committer: benwa <[email protected]>
Committed: Thu Jan 25 16:28:45 2018 +0700

----------------------------------------------------------------------
 .../mailbox/store/StoreMessageManager.java      |   2 +-
 .../store/streaming/BodyOffsetInputStream.java  | 159 -------------------
 .../streaming/BodyOffsetInputStreamTest.java    |  72 ---------
 .../james/util/BodyOffsetInputStream.java       | 159 +++++++++++++++++++
 .../james/util/BodyOffsetInputStreamTest.java   |  72 +++++++++
 .../mailrepository/MailRepositoryContract.java  |  11 ++
 .../mailrepository-cassandra/pom.xml            |  15 +-
 .../cassandra/CassandraMailRepository.java      | 157 +++++++++++++++++-
 .../cassandra/CassandraMailRepositoryTest.java  |  45 +++++-
 9 files changed, 446 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
----------------------------------------------------------------------
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 38491a1..704d881 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -72,7 +72,6 @@ import 
org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.quota.QuotaChecker;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
-import org.apache.james.mailbox.store.streaming.BodyOffsetInputStream;
 import org.apache.james.mailbox.store.streaming.CountingInputStream;
 import org.apache.james.mime4j.MimeException;
 import org.apache.james.mime4j.message.DefaultBodyDescriptorBuilder;
@@ -82,6 +81,7 @@ import org.apache.james.mime4j.stream.EntityState;
 import org.apache.james.mime4j.stream.MimeConfig;
 import org.apache.james.mime4j.stream.MimeTokenStream;
 import org.apache.james.mime4j.stream.RecursionMode;
+import org.apache.james.util.BodyOffsetInputStream;
 import org.apache.james.util.IteratorWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java
----------------------------------------------------------------------
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java
deleted file mode 100644
index 96474a5..0000000
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.mailbox.store.streaming;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PushbackInputStream;
-
-/**
- * {@link InputStream} which helps to keep track of the BodyOffset of the 
wrapped
- * {@link InputStream}
- *  
- *  IMPORTANT: This class is not thread-safe!
- *
- */
-public class BodyOffsetInputStream extends InputStream {
-    private long count = 0;
-    private long bodyStartOctet = -1;
-    private final PushbackInputStream in;
-    private long readBytes = 0;
-
-    public BodyOffsetInputStream(InputStream in) {
-        // we need to pushback at max 3 bytes
-        this.in = new PushbackInputStream(in, 3);
-    }
-
-    /**
-     * @see java.io.InputStream#read()
-     */
-    public int read() throws IOException {
-        int i = in.read();
-        if (i != -1) {
-            readBytes++;
-            if (bodyStartOctet == -1 && i == 0x0D) {
-                int a = in.read();
-                if (a == 0x0A) {
-                    int b = in.read();
-
-                    if (b == 0x0D) {
-                        int c = in.read();
-
-                        if (c == 0x0A) {
-                            bodyStartOctet = count + 4;
-                        }
-                        in.unread(c);
-                    }
-                    in.unread(b);
-                }
-                in.unread(a);
-            }
-            count++;
-        }
-        return i;
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        if (bodyStartOctet == -1) {
-            return super.read(b, off, len);
-        } else {
-            int r = in.read(b, off, len);
-            if (r != -1) {
-                readBytes += r;
-            }
-            return r;
-        }
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        if (bodyStartOctet == -1) {
-            return super.read(b);
-        } else {
-            int r = in.read(b);
-            if (r != -1) {
-                readBytes += r;
-            }
-            return r;
-        }
-    }
-
-    @Override
-    public int available() throws IOException {
-        return in.available();
-    }
-
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-
-    @Override
-    public void mark(int readlimit) {
-        
-    }
-
-    /**
-     * Mark is not supported by this implementation
-     */
-    public boolean markSupported() {
-        return false;
-    }
-
-    /**
-     * Throws {@link IOException}
-     */
-    public void reset() throws IOException {
-        throw new IOException("Not supported");
-    }
-
-    @Override
-    public long skip(long n) throws IOException {
-        long i = 0; 
-        while (i++ < n) {
-            if (read() == -1) {
-                break;
-            }
-        }
-        return i;
-    }
-    
-    /**
-     * Return the bodyStartOffset or -1 if it could not be found. 
-     * Be aware you can only expect some valid result from the method
-     * if you have consumed the whole InputStream or if you are
-     * sure that you reached the body
-     * 
-     * @return offset
-     */
-    public long getBodyStartOffset() {
-        return bodyStartOctet;
-    }
-    
-    /**
-     * Return the read bytes so far
-     * 
-     * @return readBytes
-     */
-    public long getReadBytes() {
-        return readBytes;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java
deleted file mode 100644
index 00b38c0..0000000
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.mailbox.store.streaming;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import org.junit.Test;
-
-public class BodyOffsetInputStreamTest {
-    private String mail = "Subject: test\r\n\r\nbody";
-    private long expectedOffset = 17;
-    private long bytes = mail.length();
-    
-    @Test
-    public void testRead() throws IOException {
-        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
-        
-        while (in.read() != -1) {
-            // consume stream
-        }
-        assertEquals(expectedOffset, in.getBodyStartOffset());
-        assertEquals(bytes, in.getReadBytes());
-        in.close();
-    }
-    
-    
-    @Test
-    public void testReadWithArray() throws IOException {
-        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
-        
-        byte[] b = new byte[8];
-        while (in.read(b) != -1) {
-            // consume stream
-        }
-        assertEquals(expectedOffset, in.getBodyStartOffset());
-        assertEquals(bytes, in.getReadBytes());
-        in.close();
-    }
-    
-    
-    @Test
-    public void testReadWithArrayBiggerThenStream() throws IOException {
-        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
-        
-        byte[] b = new byte[4096];
-        while (in.read(b) != -1) {
-            // consume stream
-        }
-        assertEquals(expectedOffset, in.getBodyStartOffset());
-        assertEquals(bytes, in.getReadBytes());
-        in.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java
 
b/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java
new file mode 100644
index 0000000..ddb4dfe
--- /dev/null
+++ 
b/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java
@@ -0,0 +1,159 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+/**
+ * {@link InputStream} which helps to keep track of the BodyOffset of the 
wrapped
+ * {@link InputStream}
+ *  
+ *  IMPORTANT: This class is not thread-safe!
+ *
+ */
+public class BodyOffsetInputStream extends InputStream {
+    private long count = 0;
+    private long bodyStartOctet = -1;
+    private final PushbackInputStream in;
+    private long readBytes = 0;
+
+    public BodyOffsetInputStream(InputStream in) {
+        // we need to pushback at max 3 bytes
+        this.in = new PushbackInputStream(in, 3);
+    }
+
+    /**
+     * @see InputStream#read()
+     */
+    public int read() throws IOException {
+        int i = in.read();
+        if (i != -1) {
+            readBytes++;
+            if (bodyStartOctet == -1 && i == 0x0D) {
+                int a = in.read();
+                if (a == 0x0A) {
+                    int b = in.read();
+
+                    if (b == 0x0D) {
+                        int c = in.read();
+
+                        if (c == 0x0A) {
+                            bodyStartOctet = count + 4;
+                        }
+                        in.unread(c);
+                    }
+                    in.unread(b);
+                }
+                in.unread(a);
+            }
+            count++;
+        }
+        return i;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bodyStartOctet == -1) {
+            return super.read(b, off, len);
+        } else {
+            int r = in.read(b, off, len);
+            if (r != -1) {
+                readBytes += r;
+            }
+            return r;
+        }
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        if (bodyStartOctet == -1) {
+            return super.read(b);
+        } else {
+            int r = in.read(b);
+            if (r != -1) {
+                readBytes += r;
+            }
+            return r;
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        
+    }
+
+    /**
+     * Mark is not supported by this implementation
+     */
+    public boolean markSupported() {
+        return false;
+    }
+
+    /**
+     * Throws {@link IOException}
+     */
+    public void reset() throws IOException {
+        throw new IOException("Not supported");
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        long i = 0; 
+        while (i++ < n) {
+            if (read() == -1) {
+                break;
+            }
+        }
+        return i;
+    }
+    
+    /**
+     * Return the bodyStartOffset or -1 if it could not be found. 
+     * Be aware you can only expect some valid result from the method
+     * if you have consumed the whole InputStream or if you are
+     * sure that you reached the body
+     * 
+     * @return offset
+     */
+    public long getBodyStartOffset() {
+        return bodyStartOctet;
+    }
+    
+    /**
+     * Return the read bytes so far
+     * 
+     * @return readBytes
+     */
+    public long getReadBytes() {
+        return readBytes;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java
new file mode 100644
index 0000000..645d337
--- /dev/null
+++ 
b/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+public class BodyOffsetInputStreamTest {
+    private String mail = "Subject: test\r\n\r\nbody";
+    private long expectedOffset = 17;
+    private long bytes = mail.length();
+    
+    @Test
+    public void testRead() throws IOException {
+        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
+        
+        while (in.read() != -1) {
+            // consume stream
+        }
+        assertEquals(expectedOffset, in.getBodyStartOffset());
+        assertEquals(bytes, in.getReadBytes());
+        in.close();
+    }
+    
+    
+    @Test
+    public void testReadWithArray() throws IOException {
+        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
+        
+        byte[] b = new byte[8];
+        while (in.read(b) != -1) {
+            // consume stream
+        }
+        assertEquals(expectedOffset, in.getBodyStartOffset());
+        assertEquals(bytes, in.getReadBytes());
+        in.close();
+    }
+    
+    
+    @Test
+    public void testReadWithArrayBiggerThenStream() throws IOException {
+        BodyOffsetInputStream in = new BodyOffsetInputStream(new 
ByteArrayInputStream(mail.getBytes()));
+        
+        byte[] b = new byte[4096];
+        while (in.read(b) != -1) {
+            // consume stream
+        }
+        assertEquals(expectedOffset, in.getBodyStartOffset());
+        assertEquals(bytes, in.getReadBytes());
+        in.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java
 
b/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java
index d52c8cc..e6c9fc1 100644
--- 
a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java
+++ 
b/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java
@@ -172,6 +172,17 @@ public interface MailRepositoryContract {
     }
 
     @Test
+    default void retrieveShouldReturnNullAfterRemoveAll() throws Exception {
+        MailRepository testee = retrieveRepository();
+        String name = "name";
+        testee.store(createMail(name));
+
+        testee.removeAll();
+
+        assertThat(testee.retrieve(name)).isNull();
+    }
+
+    @Test
     default void removeAllShouldBeIdempotent() throws Exception {
         MailRepository testee = retrieveRepository();
         testee.store(createMail("name"));

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml 
b/server/mailrepository/mailrepository-cassandra/pom.xml
index d55228d..fde14ef 100644
--- a/server/mailrepository/mailrepository-cassandra/pom.xml
+++ b/server/mailrepository/mailrepository-cassandra/pom.xml
@@ -49,6 +49,11 @@
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>blob-cassandra</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>blob-api</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
@@ -58,11 +63,6 @@
             <artifactId>james-server-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.james</groupId>
-            <artifactId>james-server-testing</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>james-server-mailrepository-api</artifactId>
         </dependency>
@@ -73,6 +73,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index b1f171e..9fd5d3f 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -19,48 +19,195 @@
 
 package org.apache.james.mailrepository.cassandra;
 
+import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.ObjectStore;
 import org.apache.james.mailrepository.api.MailRepository;
+import org.apache.james.util.BodyOffsetInputStream;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Bytes;
+
 public class CassandraMailRepository implements MailRepository {
 
-    public CassandraMailRepository() {
+    private final String url;
+    private final CassandraMailRepositoryKeysDAO keysDAO;
+    private final CassandraMailRepositoryCountDAO countDAO;
+    private final CassandraMailRepositoryMailDAO mailDAO;
+    private final ObjectStore objectStore;
+
+    public CassandraMailRepository(String url, CassandraMailRepositoryKeysDAO 
keysDAO, CassandraMailRepositoryCountDAO countDAO, 
CassandraMailRepositoryMailDAO mailDAO, ObjectStore objectStore) {
+        this.url = url;
+        this.keysDAO = keysDAO;
+        this.countDAO = countDAO;
+        this.mailDAO = mailDAO;
+        this.objectStore = objectStore;
     }
 
     @Override
-    public void store(Mail mail) {
+    public void store(Mail mail) throws MessagingException {
+        try {
+            Pair<byte[], byte[]> splitHeaderBody = 
splitHeaderBody(mail.getMessage());
+
+            CompletableFuture<Pair<BlobId, BlobId>> blobIds = 
CompletableFutureUtil.combine(
+                objectStore.save(splitHeaderBody.getLeft()),
+                objectStore.save(splitHeaderBody.getRight()),
+                Pair::of);
+
+            blobIds.thenCompose(Throwing.function(pair ->
+                mailDAO.store(url, mail, pair.getLeft(), pair.getRight())))
+                .thenCompose(any -> CompletableFuture.allOf(
+                    countDAO.increment(url),
+                    keysDAO.store(url, mail.getName())))
+                .join();
+        } catch (IOException e) {
+            throw new MessagingException("Exception while storing mail", e);
+        }
+    }
+
+    public Pair<byte[], byte[]> splitHeaderBody(MimeMessage message) throws 
IOException, MessagingException {
+        byte[] messageAsArray = messageToArray(message);
+        int bodyStartOctet = computeBodyStartOctet(messageAsArray);
+
+        return Pair.of(
+            getHeaderBytes(messageAsArray, bodyStartOctet),
+            getBodyBytes(messageAsArray, bodyStartOctet));
+    }
+
+    public byte[] messageToArray(MimeMessage message) throws IOException, 
MessagingException {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        message.writeTo(byteArrayOutputStream);
+        return byteArrayOutputStream.toByteArray();
+    }
+
+    public byte[] getHeaderBytes(byte[] messageContentAsArray, int 
bodyStartOctet) {
+        ByteBuffer headerContent = ByteBuffer.wrap(messageContentAsArray, 0, 
bodyStartOctet);
+        byte[] headerBytes = new byte[bodyStartOctet];
+        headerContent.get(headerBytes);
+        return headerBytes;
+    }
+
+    public byte[] getBodyBytes(byte[] messageContentAsArray, int 
bodyStartOctet) {
+        if (bodyStartOctet < messageContentAsArray.length) {
+            ByteBuffer bodyContent = ByteBuffer.wrap(messageContentAsArray,
+                bodyStartOctet,
+                messageContentAsArray.length - bodyStartOctet);
+            byte[] bodyBytes = new byte[messageContentAsArray.length - 
bodyStartOctet];
+            bodyContent.get(bodyBytes);
+            return bodyBytes;
+        } else {
+            return new byte[] {};
+        }
+    }
+
+    public int computeBodyStartOctet(byte[] messageAsArray) throws IOException 
{
+        try (BodyOffsetInputStream bodyOffsetInputStream =
+                 new BodyOffsetInputStream(new 
ByteArrayInputStream(messageAsArray))) {
+            consume(bodyOffsetInputStream);
+
+            if (bodyOffsetInputStream.getBodyStartOffset() == -1) {
+                return 0;
+            }
+            return (int) bodyOffsetInputStream.getBodyStartOffset();
+        }
+    }
+
+    private void consume(InputStream in) throws IOException {
+        IOUtils.copy(in, NULL_OUTPUT_STREAM);
     }
 
     @Override
     public Iterator<String> list() {
-        return null;
+        return keysDAO.list(url)
+            .join()
+            .iterator();
     }
 
     @Override
     public Mail retrieve(String key) {
-        return null;
+        return CompletableFutureUtil
+            .unwrap(mailDAO.read(url, key)
+                .thenApply(optional -> optional.map(this::toMail)))
+            .join()
+            .orElse(null);
+    }
+
+    public CompletableFuture<Mail> 
toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
+        return CompletableFutureUtil.combine(
+            objectStore.read(mailDTO.getHeaderBlobId()),
+            objectStore.read(mailDTO.getBodyBlobId()),
+            Bytes::concat)
+            .thenApply(this::toMimeMessage)
+            .thenApply(mimeMessage -> mailDTO.getMailBuilder()
+                .mimeMessage(mimeMessage)
+                .build());
+    }
+
+    public MimeMessage toMimeMessage(byte[] bytes) {
+        try {
+            return new MimeMessage(Session.getInstance(new Properties()), new 
ByteArrayInputStream(bytes));
+        } catch (MessagingException e) {
+            throw Throwables.propagate(e);
+        }
     }
 
     @Override
     public void remove(Mail mail) {
+        removeAsync(mail.getName()).join();
     }
 
     @Override
     public void remove(Collection<Mail> toRemove) {
+        FluentFutureStream.of(toRemove.stream()
+            .map(Mail::getName)
+            .map(this::removeAsync))
+            .join();
     }
 
     @Override
     public void remove(String key) {
+        removeAsync(key).join();
+    }
+
+    public CompletableFuture<Void> removeAsync(String key) {
+        return CompletableFuture.allOf(
+            keysDAO.remove(url, key),
+            countDAO.decrement(url))
+            .thenCompose(any -> mailDAO.remove(url, key));
     }
 
     @Override
     public long size() throws MessagingException {
-        return 0;
+        return countDAO.getCount(url).join();
+    }
+
+    @Override
+    public void removeAll() throws MessagingException {
+        keysDAO.list(url)
+            .thenCompose(stream -> 
FluentFutureStream.of(stream.map(this::removeAsync))
+                .completableFuture())
+            .join();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index 302c136..31abe57 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -19,23 +19,60 @@
 
 package org.apache.james.mailrepository.cassandra;
 
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.DockerCassandraExtension;
+import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.cassandra.CassandraBlobId;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobsDAO;
 import org.apache.james.mailrepository.MailRepositoryContract;
 import org.apache.james.mailrepository.api.MailRepository;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-@Disabled("Skeleton implementation")
+@ExtendWith(DockerCassandraExtension.class)
 class CassandraMailRepositoryTest implements MailRepositoryContract {
+    static final String URL = "url";
+    static final CassandraBlobId.Factory BLOB_ID_FACTORY = new 
CassandraBlobId.Factory();
 
-    private CassandraMailRepository cassandraMailRepository;
+    CassandraMailRepository cassandraMailRepository;
+    CassandraCluster cassandra;
 
     @BeforeEach
-    void setup() {
-        cassandraMailRepository = new CassandraMailRepository();
+    void setup(DockerCassandraExtension.DockerCassandra dockerCassandra) {
+        cassandra = CassandraCluster.create(
+            new CassandraModuleComposite(
+                new CassandraMailRepositoryModule(),
+                new CassandraBlobModule()),
+            dockerCassandra.getIp(), dockerCassandra.getBindingPort());
+
+        CassandraMailRepositoryMailDAO mailDAO = new 
CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, 
cassandra.getTypesProvider());
+        CassandraMailRepositoryKeysDAO keysDAO = new 
CassandraMailRepositoryKeysDAO(cassandra.getConf(), 
CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraMailRepositoryCountDAO countDAO = new 
CassandraMailRepositoryCountDAO(cassandra.getConf());
+        CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf());
+
+        cassandraMailRepository = new CassandraMailRepository(URL,
+            keysDAO, countDAO, mailDAO, blobsDAO);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        cassandra.close();
     }
 
     @Override
     public MailRepository retrieveRepository() {
         return cassandraMailRepository;
     }
+
+    @Test
+    @Disabled("key is unique in Cassandra")
+    @Override
+    public void sizeShouldBeIncrementedByOneWhenDuplicates() throws Exception {
+    }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to