JAMES-2122 Adding a log on swallowed blobs
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c2eddf9 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c2eddf9 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c2eddf9 Branch: refs/heads/master Commit: 8c2eddf9402de6002d210a37b2333a3a04f79768 Parents: 9507ba7 Author: benwa <btell...@linagora.com> Authored: Thu Aug 17 10:19:20 2017 +0700 Committer: Matthieu Baechler <matth...@apache.org> Committed: Thu Aug 17 13:13:33 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/CassandraBlobsDAO.java | 47 +++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/8c2eddf9/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java index f6e3d21..819d427 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java @@ -41,16 +41,20 @@ import org.apache.james.mailbox.cassandra.table.BlobTable; import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; public class CassandraBlobsDAO { + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class); private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement insert; private final PreparedStatement insertPart; @@ -113,7 +117,7 @@ public class CassandraBlobsDAO { } private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) { - return FluentFutureStream.<Pair<Integer, Void>> of( + return FluentFutureStream.of( dataChunker.chunk(data, configuration.getBlobPartSize()) .map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) .thenApply(partId -> Pair.of(pair.getKey(), partId)))) @@ -146,23 +150,29 @@ public class CassandraBlobsDAO { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(BlobTable.ID, blobId.getId())) - .thenCompose(this::toDataParts) + .thenCompose(row -> toDataParts(row, blobId)) .thenApply(this::concatenateDataParts); } - private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row> blobRowOptional) { + private CompletableFuture<Stream<BlobPart>> toDataParts(Optional<Row> blobRowOptional, BlobId blobId) { return blobRowOptional.map(blobRow -> { - BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID)); int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK); return FluentFutureStream.of( IntStream.range(0, numOfChunk) .mapToObj(position -> readPart(blobId, position))) .completableFuture(); - }).orElse(CompletableFuture.completedFuture(Stream.empty())); - } - - private byte[] concatenateDataParts(Stream<Optional<Row>> rows) { - ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream) + }).orElseGet(() -> { + LOGGER.warn("Could not retrieve blob metadata for {}", blobId); + return CompletableFuture.completedFuture(Stream.empty()); + }); + } + + private byte[] concatenateDataParts(Stream<BlobPart> blobParts) { + ImmutableList<byte[]> parts = blobParts + .map(blobPart -> OptionalConverter.ifEmpty( + blobPart.row, + () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position))) + .flatMap(OptionalConverter::toStream) .map(this::rowToData) .collect(Guavate.toImmutableList()); @@ -175,10 +185,25 @@ public class CassandraBlobsDAO { return data; } - private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position) { + private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) { return cassandraAsyncExecutor.executeSingleRow( selectPart.bind() .setString(BlobTable.ID, blobId.getId()) - .setInt(BlobParts.CHUNK_NUMBER, position)); + .setInt(BlobParts.CHUNK_NUMBER, position)) + .thenApply(row -> new BlobPart(blobId, position, row)); + } + + private static class BlobPart { + private final BlobId blobId; + private final int position; + private final Optional<Row> row; + + public BlobPart(BlobId blobId, int position, Optional<Row> row) { + Preconditions.checkNotNull(blobId); + Preconditions.checkArgument(position >= 0, "position need to be positive"); + this.blobId = blobId; + this.position = position; + this.row = row; + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org