JAMES-2082 proposition of a new organisation for blob
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e9979b56 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e9979b56 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e9979b56 Branch: refs/heads/master Commit: e9979b56e03f362c094c72de3ff0ee0f020b5373 Parents: bd69ab9 Author: Luc DUZAN <ldu...@linagora.com> Authored: Thu Jul 6 17:57:03 2017 +0200 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:56 2017 +0200 ---------------------------------------------------------------------- .../james/mailbox/cassandra/ids/PartId.java | 72 --------------- .../cassandra/mail/CassandraBlobsDAO.java | 94 ++++++++++---------- .../cassandra/modules/CassandraBlobModule.java | 21 +++-- .../mailbox/cassandra/table/BlobTable.java | 7 +- .../table/CassandraMessageV2Table.java | 12 --- .../james/mailbox/cassandra/ids/PartIdTest.java | 87 ------------------ 6 files changed, 60 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java deleted file mode 100644 index 38715d0..0000000 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/PartId.java +++ /dev/null @@ -1,72 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.cassandra.ids; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -public class PartId { - public static PartId create(BlobId blobId, int position) { - Preconditions.checkNotNull(blobId); - Preconditions.checkArgument(position >= 0, "Position needs to be positive"); - return new PartId(blobId.getId() + "-" + position); - } - - public static PartId from(String id) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(id)); - return new PartId(id); - } - - private final String id; - - @VisibleForTesting - PartId(String id) { - this.id = id; - } - - public String getId() { - return id; - } - - @Override - public final boolean equals(Object obj) { - if (obj instanceof PartId) { - PartId other = (PartId) obj; - return Objects.equal(id, other.id); - } - return false; - } - - @Override - public final int hashCode() { - return Objects.hashCode(id); - } - - @Override - public String toString() { - return MoreObjects - .toStringHelper(this) - .add("id", id) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java index 55efcda..e73fd32 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java @@ -23,36 +23,31 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts; -import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs; import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; import java.util.stream.Stream; import javax.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.cassandra.ids.BlobId; -import org.apache.james.mailbox.cassandra.ids.PartId; import org.apache.james.mailbox.cassandra.mail.utils.DataChunker; -import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.mailbox.cassandra.table.BlobTable; +import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalConverter; import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Bytes; - public class CassandraBlobsDAO { public static final int CHUNK_SIZE = 1024 * 100; @@ -76,26 +71,27 @@ public class CassandraBlobsDAO { private PreparedStatement prepareSelect(Session session) { return session.prepare(select() - .from(Blobs.TABLE_NAME) - .where(eq(Blobs.ID, bindMarker(Blobs.ID)))); + .from(BlobTable.TABLE_NAME) + .where(eq(BlobTable.ID, bindMarker(BlobTable.ID)))); } private PreparedStatement prepareSelectPart(Session session) { return session.prepare(select() .from(BlobParts.TABLE_NAME) - .where(eq(BlobParts.ID, bindMarker(BlobParts.ID)))); + .where(eq(BlobTable.ID, bindMarker(BlobTable.ID))) + .and(eq(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER)))); } private PreparedStatement prepareInsert(Session session) { - return session.prepare(insertInto(Blobs.TABLE_NAME) - .value(Blobs.ID, bindMarker(Blobs.ID)) - .value(Blobs.POSITION, bindMarker(Blobs.POSITION)) - .value(Blobs.PART, bindMarker(Blobs.PART))); + return session.prepare(insertInto(BlobTable.TABLE_NAME) + .value(BlobTable.ID, bindMarker(BlobTable.ID)) + .value(BlobTable.NUMBER_OF_CHUNK, bindMarker(BlobTable.NUMBER_OF_CHUNK))); } private PreparedStatement prepareInsertPart(Session session) { return session.prepare(insertInto(BlobParts.TABLE_NAME) - .value(BlobParts.ID, bindMarker(BlobParts.ID)) + .value(BlobTable.ID, bindMarker(BlobTable.ID)) + .value(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER)) .value(BlobParts.DATA, bindMarker(BlobParts.DATA))); } @@ -105,55 +101,57 @@ public class CassandraBlobsDAO { } BlobId blobId = BlobId.forPayload(data); return saveBlobParts(data, blobId) - .thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds)) + .thenCompose(numberOfChunk-> saveBlobPartsReferences(blobId, numberOfChunk)) .thenApply(any -> Optional.of(blobId)); } - private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[] data, BlobId blobId) { + private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) { return FluentFutureStream.of( dataChunker.chunk(data, CHUNK_SIZE) .map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) .thenApply(partId -> Pair.of(pair.getKey(), partId)))) - .completableFuture(); + .completableFuture() + .thenApply(stream -> + getLastOfStream(stream) + .map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1) + .orElse(0)); + } + + private static <T> Optional<T> getLastOfStream(Stream<T> stream) { + return stream.reduce((first, second) -> second); } - private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int position) { - PartId partId = PartId.create(blobId, position); + private CompletableFuture<Void> writePart(ByteBuffer data, BlobId blobId, int position) { return cassandraAsyncExecutor.executeVoid( insertPart.bind() - .setString(BlobParts.ID, partId.getId()) - .setBytes(BlobParts.DATA, data)) - .thenApply(any -> partId); + .setString(BlobTable.ID, blobId.getId()) + .setInt(BlobParts.CHUNK_NUMBER, position) + .setBytes(BlobParts.DATA, data)); } - private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId, Stream<Pair<Integer, PartId>> stream) { - return FluentFutureStream.of(stream.map(pair -> - cassandraAsyncExecutor.executeVoid(insert.bind() - .setString(Blobs.ID, blobId.getId()) - .setLong(Blobs.POSITION, pair.getKey()) - .setString(Blobs.PART, pair.getValue().getId())))) - .completableFuture(); + private CompletableFuture<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) { + return cassandraAsyncExecutor.executeVoid(insert.bind() + .setString(BlobTable.ID, blobId.getId()) + .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk)); } public CompletableFuture<byte[]> read(BlobId blobId) { - return cassandraAsyncExecutor.execute( + return cassandraAsyncExecutor.executeSingleRow( select.bind() - .setString(Blobs.ID, blobId.getId())) - .thenApply(this::toPartIds) + .setString(BlobTable.ID, blobId.getId())) .thenCompose(this::toDataParts) .thenApply(this::concatenateDataParts); } - private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) { - return CassandraUtils.convertToStream(resultSet) - .map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART)))) - .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)); - } - - private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long, PartId> positionToIds) { - return CompletableFutureUtil.chainAll( - positionToIds.values().stream(), - this::readPart); + private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row> blobRowOptional) { + return blobRowOptional.map(blobRow -> { + BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID)); + int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK); + return FluentFutureStream.of( + IntStream.range(0, numOfChunk) + .mapToObj(position -> readPart(blobId, position))) + .completableFuture(); + }).orElse(CompletableFuture.completedFuture(Stream.empty())); } private byte[] concatenateDataParts(Stream<Optional<Row>> rows) { @@ -170,8 +168,10 @@ public class CassandraBlobsDAO { return data; } - private CompletableFuture<Optional<Row>> readPart(PartId partId) { - return cassandraAsyncExecutor.executeSingleRow(selectPart.bind() - .setString(BlobParts.ID, partId.getId())); + private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position) { + return cassandraAsyncExecutor.executeSingleRow( + selectPart.bind() + .setString(BlobTable.ID, blobId.getId()) + .setInt(BlobParts.CHUNK_NUMBER, position)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java index 4eea870..a8a382b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java @@ -24,12 +24,11 @@ import java.util.List; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.components.CassandraTable; import org.apache.james.backends.cassandra.components.CassandraType; -import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts; -import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs; import com.datastax.driver.core.DataType; import com.datastax.driver.core.schemabuilder.SchemaBuilder; import com.google.common.collect.ImmutableList; +import org.apache.james.mailbox.cassandra.table.BlobTable; public class CassandraBlobModule implements CassandraModule { @@ -38,17 +37,17 @@ public class CassandraBlobModule implements CassandraModule { public CassandraBlobModule() { tables = ImmutableList.of( - new CassandraTable(Blobs.TABLE_NAME, - SchemaBuilder.createTable(Blobs.TABLE_NAME) + new CassandraTable(BlobTable.TABLE_NAME, + SchemaBuilder.createTable(BlobTable.TABLE_NAME) .ifNotExists() - .addPartitionKey(Blobs.ID, DataType.text()) - .addClusteringColumn(Blobs.POSITION, DataType.bigint()) - .addColumn(Blobs.PART, DataType.text())), - new CassandraTable(BlobParts.TABLE_NAME, - SchemaBuilder.createTable(BlobParts.TABLE_NAME) + .addPartitionKey(BlobTable.ID, DataType.text()) + .addClusteringColumn(BlobTable.NUMBER_OF_CHUNK, DataType.cint())), + new CassandraTable(BlobTable.BlobParts.TABLE_NAME, + SchemaBuilder.createTable(BlobTable.BlobParts.TABLE_NAME) .ifNotExists() - .addPartitionKey(BlobParts.ID, DataType.text()) - .addColumn(BlobParts.DATA, DataType.blob()))); + .addPartitionKey(BlobTable.ID, DataType.text()) + .addClusteringColumn(BlobTable.BlobParts.CHUNK_NUMBER, DataType.cint()) + .addColumn(BlobTable.BlobParts.DATA, DataType.blob()))); types = ImmutableList.of(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java index be097a5..1c0b0e9 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java @@ -19,15 +19,14 @@ package org.apache.james.mailbox.cassandra.table; -public interface BlobsTable { +public interface BlobTable { String TABLE_NAME = "blobs"; String ID = "id"; - String POSITION = "position"; - String PART = "part"; + String NUMBER_OF_CHUNK = "position"; interface BlobParts { String TABLE_NAME = "blobParts"; - String ID = "id"; + String CHUNK_NUMBER = "chunkNumber"; String DATA = "data"; } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java index f7bc698..c2421f3 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java @@ -52,16 +52,4 @@ public interface CassandraMessageV2Table { String IS_INLINE = "isInline"; } - interface Blobs { - String TABLE_NAME = "blobs"; - String ID = "id"; - String POSITION = "position"; - String PART = "part"; - } - - interface BlobParts { - String TABLE_NAME = "blobParts"; - String ID = "id"; - String DATA = "data"; - } } http://git-wip-us.apache.org/repos/asf/james-project/blob/e9979b56/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java deleted file mode 100644 index b236c55..0000000 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.cassandra.ids; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import nl.jqno.equalsverifier.EqualsVerifier; - -public class PartIdTest { - private static final BlobId BLOB_ID = BlobId.from("abc"); - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void shouldRespectBeanContract() { - EqualsVerifier.forClass(PartId.class).verify(); - } - - @Test - public void test () { - String id = "111"; - assertThat(PartId.from(id)) - .isEqualTo(new PartId(id)); - } - - @Test - public void fromShouldThrowOnNull() { - expectedException.expect(IllegalArgumentException.class); - - PartId.from(null); - } - - @Test - public void fromShouldThrowOnEmpty() { - expectedException.expect(IllegalArgumentException.class); - - PartId.from(""); - } - - @Test - public void createShouldThrowOnNullBlobId() { - expectedException.expect(NullPointerException.class); - - PartId.create(null, 1); - } - - @Test - public void createShouldThrowOnNegativePosition() { - expectedException.expect(IllegalArgumentException.class); - - PartId.create(BLOB_ID, -1); - } - - @Test - public void createShouldAcceptPositionZero() { - assertThat(PartId.create(BLOB_ID, 0).getId()) - .isEqualTo(BLOB_ID.getId() + "-0"); - } - - @Test - public void createShouldConcatenateBlobIdAndPosition() { - assertThat(PartId.create(BLOB_ID, 36).getId()) - .isEqualTo(BLOB_ID.getId() + "-36"); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org