JAMES-2291 Implement CassandraMailRepository
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d281a05a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d281a05a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d281a05a Branch: refs/heads/master Commit: d281a05a6ab8960054d799a815292fd13d83e482 Parents: 0806d78 Author: benwa <[email protected]> Authored: Wed Jan 24 13:34:42 2018 +0700 Committer: benwa <[email protected]> Committed: Thu Jan 25 16:28:45 2018 +0700 ---------------------------------------------------------------------- .../mailbox/store/StoreMessageManager.java | 2 +- .../store/streaming/BodyOffsetInputStream.java | 159 ------------------- .../streaming/BodyOffsetInputStreamTest.java | 72 --------- .../james/util/BodyOffsetInputStream.java | 159 +++++++++++++++++++ .../james/util/BodyOffsetInputStreamTest.java | 72 +++++++++ .../mailrepository/MailRepositoryContract.java | 11 ++ .../mailrepository-cassandra/pom.xml | 15 +- .../cassandra/CassandraMailRepository.java | 157 +++++++++++++++++- .../cassandra/CassandraMailRepositoryTest.java | 45 +++++- 9 files changed, 446 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index 38491a1..704d881 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -72,7 +72,6 @@ import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.apache.james.mailbox.store.quota.QuotaChecker; import org.apache.james.mailbox.store.search.MessageSearchIndex; -import org.apache.james.mailbox.store.streaming.BodyOffsetInputStream; import org.apache.james.mailbox.store.streaming.CountingInputStream; import org.apache.james.mime4j.MimeException; import org.apache.james.mime4j.message.DefaultBodyDescriptorBuilder; @@ -82,6 +81,7 @@ import org.apache.james.mime4j.stream.EntityState; import org.apache.james.mime4j.stream.MimeConfig; import org.apache.james.mime4j.stream.MimeTokenStream; import org.apache.james.mime4j.stream.RecursionMode; +import org.apache.james.util.BodyOffsetInputStream; import org.apache.james.util.IteratorWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java deleted file mode 100644 index 96474a5..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStream.java +++ /dev/null @@ -1,159 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.store.streaming; - -import java.io.IOException; -import java.io.InputStream; -import java.io.PushbackInputStream; - -/** - * {@link InputStream} which helps to keep track of the BodyOffset of the wrapped - * {@link InputStream} - * - * IMPORTANT: This class is not thread-safe! - * - */ -public class BodyOffsetInputStream extends InputStream { - private long count = 0; - private long bodyStartOctet = -1; - private final PushbackInputStream in; - private long readBytes = 0; - - public BodyOffsetInputStream(InputStream in) { - // we need to pushback at max 3 bytes - this.in = new PushbackInputStream(in, 3); - } - - /** - * @see java.io.InputStream#read() - */ - public int read() throws IOException { - int i = in.read(); - if (i != -1) { - readBytes++; - if (bodyStartOctet == -1 && i == 0x0D) { - int a = in.read(); - if (a == 0x0A) { - int b = in.read(); - - if (b == 0x0D) { - int c = in.read(); - - if (c == 0x0A) { - bodyStartOctet = count + 4; - } - in.unread(c); - } - in.unread(b); - } - in.unread(a); - } - count++; - } - return i; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (bodyStartOctet == -1) { - return super.read(b, off, len); - } else { - int r = in.read(b, off, len); - if (r != -1) { - readBytes += r; - } - return r; - } - } - - @Override - public int read(byte[] b) throws IOException { - if (bodyStartOctet == -1) { - return super.read(b); - } else { - int r = in.read(b); - if (r != -1) { - readBytes += r; - } - return r; - } - } - - @Override - public int available() throws IOException { - return in.available(); - } - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public void mark(int readlimit) { - - } - - /** - * Mark is not supported by this implementation - */ - public boolean markSupported() { - return false; - } - - /** - * Throws {@link IOException} - */ - public void reset() throws IOException { - throw new IOException("Not supported"); - } - - @Override - public long skip(long n) throws IOException { - long i = 0; - while (i++ < n) { - if (read() == -1) { - break; - } - } - return i; - } - - /** - * Return the bodyStartOffset or -1 if it could not be found. - * Be aware you can only expect some valid result from the method - * if you have consumed the whole InputStream or if you are - * sure that you reached the body - * - * @return offset - */ - public long getBodyStartOffset() { - return bodyStartOctet; - } - - /** - * Return the read bytes so far - * - * @return readBytes - */ - public long getReadBytes() { - return readBytes; - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java deleted file mode 100644 index 00b38c0..0000000 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/streaming/BodyOffsetInputStreamTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.store.streaming; - -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.IOException; - -import org.junit.Test; - -public class BodyOffsetInputStreamTest { - private String mail = "Subject: test\r\n\r\nbody"; - private long expectedOffset = 17; - private long bytes = mail.length(); - - @Test - public void testRead() throws IOException { - BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); - - while (in.read() != -1) { - // consume stream - } - assertEquals(expectedOffset, in.getBodyStartOffset()); - assertEquals(bytes, in.getReadBytes()); - in.close(); - } - - - @Test - public void testReadWithArray() throws IOException { - BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); - - byte[] b = new byte[8]; - while (in.read(b) != -1) { - // consume stream - } - assertEquals(expectedOffset, in.getBodyStartOffset()); - assertEquals(bytes, in.getReadBytes()); - in.close(); - } - - - @Test - public void testReadWithArrayBiggerThenStream() throws IOException { - BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); - - byte[] b = new byte[4096]; - while (in.read(b) != -1) { - // consume stream - } - assertEquals(expectedOffset, in.getBodyStartOffset()); - assertEquals(bytes, in.getReadBytes()); - in.close(); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java b/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java new file mode 100644 index 0000000..ddb4dfe --- /dev/null +++ b/server/container/util/src/main/java/org/apache/james/util/BodyOffsetInputStream.java @@ -0,0 +1,159 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + +/** + * {@link InputStream} which helps to keep track of the BodyOffset of the wrapped + * {@link InputStream} + * + * IMPORTANT: This class is not thread-safe! + * + */ +public class BodyOffsetInputStream extends InputStream { + private long count = 0; + private long bodyStartOctet = -1; + private final PushbackInputStream in; + private long readBytes = 0; + + public BodyOffsetInputStream(InputStream in) { + // we need to pushback at max 3 bytes + this.in = new PushbackInputStream(in, 3); + } + + /** + * @see InputStream#read() + */ + public int read() throws IOException { + int i = in.read(); + if (i != -1) { + readBytes++; + if (bodyStartOctet == -1 && i == 0x0D) { + int a = in.read(); + if (a == 0x0A) { + int b = in.read(); + + if (b == 0x0D) { + int c = in.read(); + + if (c == 0x0A) { + bodyStartOctet = count + 4; + } + in.unread(c); + } + in.unread(b); + } + in.unread(a); + } + count++; + } + return i; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bodyStartOctet == -1) { + return super.read(b, off, len); + } else { + int r = in.read(b, off, len); + if (r != -1) { + readBytes += r; + } + return r; + } + } + + @Override + public int read(byte[] b) throws IOException { + if (bodyStartOctet == -1) { + return super.read(b); + } else { + int r = in.read(b); + if (r != -1) { + readBytes += r; + } + return r; + } + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + + } + + /** + * Mark is not supported by this implementation + */ + public boolean markSupported() { + return false; + } + + /** + * Throws {@link IOException} + */ + public void reset() throws IOException { + throw new IOException("Not supported"); + } + + @Override + public long skip(long n) throws IOException { + long i = 0; + while (i++ < n) { + if (read() == -1) { + break; + } + } + return i; + } + + /** + * Return the bodyStartOffset or -1 if it could not be found. + * Be aware you can only expect some valid result from the method + * if you have consumed the whole InputStream or if you are + * sure that you reached the body + * + * @return offset + */ + public long getBodyStartOffset() { + return bodyStartOctet; + } + + /** + * Return the read bytes so far + * + * @return readBytes + */ + public long getReadBytes() { + return readBytes; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java b/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java new file mode 100644 index 0000000..645d337 --- /dev/null +++ b/server/container/util/src/test/java/org/apache/james/util/BodyOffsetInputStreamTest.java @@ -0,0 +1,72 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.util; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.junit.Test; + +public class BodyOffsetInputStreamTest { + private String mail = "Subject: test\r\n\r\nbody"; + private long expectedOffset = 17; + private long bytes = mail.length(); + + @Test + public void testRead() throws IOException { + BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); + + while (in.read() != -1) { + // consume stream + } + assertEquals(expectedOffset, in.getBodyStartOffset()); + assertEquals(bytes, in.getReadBytes()); + in.close(); + } + + + @Test + public void testReadWithArray() throws IOException { + BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); + + byte[] b = new byte[8]; + while (in.read(b) != -1) { + // consume stream + } + assertEquals(expectedOffset, in.getBodyStartOffset()); + assertEquals(bytes, in.getReadBytes()); + in.close(); + } + + + @Test + public void testReadWithArrayBiggerThenStream() throws IOException { + BodyOffsetInputStream in = new BodyOffsetInputStream(new ByteArrayInputStream(mail.getBytes())); + + byte[] b = new byte[4096]; + while (in.read(b) != -1) { + // consume stream + } + assertEquals(expectedOffset, in.getBodyStartOffset()); + assertEquals(bytes, in.getReadBytes()); + in.close(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java b/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java index d52c8cc..e6c9fc1 100644 --- a/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java +++ b/server/mailrepository/mailrepository-api/src/test/java/org/apache/james/mailrepository/MailRepositoryContract.java @@ -172,6 +172,17 @@ public interface MailRepositoryContract { } @Test + default void retrieveShouldReturnNullAfterRemoveAll() throws Exception { + MailRepository testee = retrieveRepository(); + String name = "name"; + testee.store(createMail(name)); + + testee.removeAll(); + + assertThat(testee.retrieve(name)).isNull(); + } + + @Test default void removeAllShouldBeIdempotent() throws Exception { MailRepository testee = retrieveRepository(); testee.store(createMail("name")); http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml b/server/mailrepository/mailrepository-cassandra/pom.xml index d55228d..fde14ef 100644 --- a/server/mailrepository/mailrepository-cassandra/pom.xml +++ b/server/mailrepository/mailrepository-cassandra/pom.xml @@ -49,6 +49,11 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>blob-cassandra</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>blob-api</artifactId> <type>test-jar</type> <scope>test</scope> @@ -58,11 +63,6 @@ <artifactId>james-server-core</artifactId> </dependency> <dependency> - <groupId>org.apache.james</groupId> - <artifactId>james-server-testing</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>${project.groupId}</groupId> <artifactId>james-server-mailrepository-api</artifactId> </dependency> @@ -73,6 +73,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index b1f171e..9fd5d3f 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -19,48 +19,195 @@ package org.apache.james.mailrepository.cassandra; +import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Iterator; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; import javax.mail.MessagingException; +import javax.mail.Session; +import javax.mail.internet.MimeMessage; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.ObjectStore; import org.apache.james.mailrepository.api.MailRepository; +import org.apache.james.util.BodyOffsetInputStream; +import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; +import com.github.fge.lambdas.Throwing; +import com.google.common.base.Throwables; +import com.google.common.primitives.Bytes; + public class CassandraMailRepository implements MailRepository { - public CassandraMailRepository() { + private final String url; + private final CassandraMailRepositoryKeysDAO keysDAO; + private final CassandraMailRepositoryCountDAO countDAO; + private final CassandraMailRepositoryMailDAO mailDAO; + private final ObjectStore objectStore; + + public CassandraMailRepository(String url, CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO countDAO, CassandraMailRepositoryMailDAO mailDAO, ObjectStore objectStore) { + this.url = url; + this.keysDAO = keysDAO; + this.countDAO = countDAO; + this.mailDAO = mailDAO; + this.objectStore = objectStore; } @Override - public void store(Mail mail) { + public void store(Mail mail) throws MessagingException { + try { + Pair<byte[], byte[]> splitHeaderBody = splitHeaderBody(mail.getMessage()); + + CompletableFuture<Pair<BlobId, BlobId>> blobIds = CompletableFutureUtil.combine( + objectStore.save(splitHeaderBody.getLeft()), + objectStore.save(splitHeaderBody.getRight()), + Pair::of); + + blobIds.thenCompose(Throwing.function(pair -> + mailDAO.store(url, mail, pair.getLeft(), pair.getRight()))) + .thenCompose(any -> CompletableFuture.allOf( + countDAO.increment(url), + keysDAO.store(url, mail.getName()))) + .join(); + } catch (IOException e) { + throw new MessagingException("Exception while storing mail", e); + } + } + + public Pair<byte[], byte[]> splitHeaderBody(MimeMessage message) throws IOException, MessagingException { + byte[] messageAsArray = messageToArray(message); + int bodyStartOctet = computeBodyStartOctet(messageAsArray); + + return Pair.of( + getHeaderBytes(messageAsArray, bodyStartOctet), + getBodyBytes(messageAsArray, bodyStartOctet)); + } + + public byte[] messageToArray(MimeMessage message) throws IOException, MessagingException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + message.writeTo(byteArrayOutputStream); + return byteArrayOutputStream.toByteArray(); + } + + public byte[] getHeaderBytes(byte[] messageContentAsArray, int bodyStartOctet) { + ByteBuffer headerContent = ByteBuffer.wrap(messageContentAsArray, 0, bodyStartOctet); + byte[] headerBytes = new byte[bodyStartOctet]; + headerContent.get(headerBytes); + return headerBytes; + } + + public byte[] getBodyBytes(byte[] messageContentAsArray, int bodyStartOctet) { + if (bodyStartOctet < messageContentAsArray.length) { + ByteBuffer bodyContent = ByteBuffer.wrap(messageContentAsArray, + bodyStartOctet, + messageContentAsArray.length - bodyStartOctet); + byte[] bodyBytes = new byte[messageContentAsArray.length - bodyStartOctet]; + bodyContent.get(bodyBytes); + return bodyBytes; + } else { + return new byte[] {}; + } + } + + public int computeBodyStartOctet(byte[] messageAsArray) throws IOException { + try (BodyOffsetInputStream bodyOffsetInputStream = + new BodyOffsetInputStream(new ByteArrayInputStream(messageAsArray))) { + consume(bodyOffsetInputStream); + + if (bodyOffsetInputStream.getBodyStartOffset() == -1) { + return 0; + } + return (int) bodyOffsetInputStream.getBodyStartOffset(); + } + } + + private void consume(InputStream in) throws IOException { + IOUtils.copy(in, NULL_OUTPUT_STREAM); } @Override public Iterator<String> list() { - return null; + return keysDAO.list(url) + .join() + .iterator(); } @Override public Mail retrieve(String key) { - return null; + return CompletableFutureUtil + .unwrap(mailDAO.read(url, key) + .thenApply(optional -> optional.map(this::toMail))) + .join() + .orElse(null); + } + + public CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { + return CompletableFutureUtil.combine( + objectStore.read(mailDTO.getHeaderBlobId()), + objectStore.read(mailDTO.getBodyBlobId()), + Bytes::concat) + .thenApply(this::toMimeMessage) + .thenApply(mimeMessage -> mailDTO.getMailBuilder() + .mimeMessage(mimeMessage) + .build()); + } + + public MimeMessage toMimeMessage(byte[] bytes) { + try { + return new MimeMessage(Session.getInstance(new Properties()), new ByteArrayInputStream(bytes)); + } catch (MessagingException e) { + throw Throwables.propagate(e); + } } @Override public void remove(Mail mail) { + removeAsync(mail.getName()).join(); } @Override public void remove(Collection<Mail> toRemove) { + FluentFutureStream.of(toRemove.stream() + .map(Mail::getName) + .map(this::removeAsync)) + .join(); } @Override public void remove(String key) { + removeAsync(key).join(); + } + + public CompletableFuture<Void> removeAsync(String key) { + return CompletableFuture.allOf( + keysDAO.remove(url, key), + countDAO.decrement(url)) + .thenCompose(any -> mailDAO.remove(url, key)); } @Override public long size() throws MessagingException { - return 0; + return countDAO.getCount(url).join(); + } + + @Override + public void removeAll() throws MessagingException { + keysDAO.list(url) + .thenCompose(stream -> FluentFutureStream.of(stream.map(this::removeAsync)) + .completableFuture()) + .join(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/d281a05a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java index 302c136..31abe57 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java @@ -19,23 +19,60 @@ package org.apache.james.mailrepository.cassandra; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.DockerCassandraExtension; +import org.apache.james.backends.cassandra.init.CassandraModuleComposite; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.blob.cassandra.CassandraBlobId; +import org.apache.james.blob.cassandra.CassandraBlobModule; +import org.apache.james.blob.cassandra.CassandraBlobsDAO; import org.apache.james.mailrepository.MailRepositoryContract; import org.apache.james.mailrepository.api.MailRepository; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; -@Disabled("Skeleton implementation") +@ExtendWith(DockerCassandraExtension.class) class CassandraMailRepositoryTest implements MailRepositoryContract { + static final String URL = "url"; + static final CassandraBlobId.Factory BLOB_ID_FACTORY = new CassandraBlobId.Factory(); - private CassandraMailRepository cassandraMailRepository; + CassandraMailRepository cassandraMailRepository; + CassandraCluster cassandra; @BeforeEach - void setup() { - cassandraMailRepository = new CassandraMailRepository(); + void setup(DockerCassandraExtension.DockerCassandra dockerCassandra) { + cassandra = CassandraCluster.create( + new CassandraModuleComposite( + new CassandraMailRepositoryModule(), + new CassandraBlobModule()), + dockerCassandra.getIp(), dockerCassandra.getBindingPort()); + + CassandraMailRepositoryMailDAO mailDAO = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider()); + CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf()); + CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf()); + + cassandraMailRepository = new CassandraMailRepository(URL, + keysDAO, countDAO, mailDAO, blobsDAO); + } + + @AfterEach + public void tearDown() { + cassandra.close(); } @Override public MailRepository retrieveRepository() { return cassandraMailRepository; } + + @Test + @Disabled("key is unique in Cassandra") + @Override + public void sizeShouldBeIncrementedByOneWhenDuplicates() throws Exception { + } + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
