This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 625fec0a4af1fcde0e8429f78cd9e1a71a73030f Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Thu Jan 9 17:47:51 2020 +0100 JAMES-3028 Dumb implementation for Cassandra BlobStore --- .../blob/cassandra/CassandraDumbBlobStore.java | 208 +++++++++++++++++++++ .../blob/cassandra/CassandraDumbBlobStoreTest.java | 62 ++++++ 2 files changed, 270 insertions(+) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java new file mode 100644 index 0000000..1ac59f6 --- /dev/null +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java @@ -0,0 +1,208 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.cassandra; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.inject.Inject; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.blob.cassandra.utils.DataChunker; +import org.apache.james.util.ReactorUtils; + +import com.github.fge.lambdas.Throwing; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class CassandraDumbBlobStore implements DumbBlobStore { + + public static final String DEFAULT_BUCKET = "cassandraDefault"; + public static final boolean LAZY = false; + + private final CassandraDefaultBucketDAO defaultBucketDAO; + private final CassandraBucketDAO bucketDAO; + private final DataChunker dataChunker; + private final CassandraConfiguration configuration; + private final HashBlobId.Factory blobIdFactory; + private final BucketName defaultBucket; + + @Inject + CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, + CassandraBucketDAO bucketDAO, + CassandraConfiguration cassandraConfiguration, + HashBlobId.Factory blobIdFactory, + BucketName defaultBucket) { + this.defaultBucketDAO = defaultBucketDAO; + this.bucketDAO = bucketDAO; + this.configuration = cassandraConfiguration; + this.blobIdFactory = blobIdFactory; + this.defaultBucket = defaultBucket; + this.dataChunker = new DataChunker(); + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId)); + } + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return readBlobParts(bucketName, blobId) + .collectList() + .map(this::byteBuffersToBytesArray); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + Preconditions.checkNotNull(data); + + return Mono.fromCallable(() -> dataChunker.chunk(data, configuration.getBlobPartSize())) + .flatMap(chunks -> save(bucketName, blobId, chunks)); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkNotNull(inputStream); + + return Mono.fromCallable(() -> dataChunker.chunkStream(inputStream, configuration.getBlobPartSize())) + .flatMap(chunks -> save(bucketName, blobId, chunks)) + .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e)); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + return Mono.using(content::openBufferedStream, + stream -> save(bucketName, blobId, stream), + Throwing.consumer(InputStream::close).sneakyThrow(), + LAZY); + } + + private Mono<Void> save(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { + return saveBlobParts(bucketName, blobId, chunksAsFlux) + .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)); + } + + private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { + return chunksAsFlux + .index() + .concatMap(pair -> writePart(bucketName, blobId, pair.getT1().intValue(), pair.getT2())) + .count() + .map(Long::intValue); + } + + private Mono<?> writePart(BucketName bucketName, BlobId blobId, int position, ByteBuffer data) { + Mono<?> write; + if (isDefaultBucket(bucketName)) { + write = defaultBucketDAO.writePart(data, blobId, position); + } else { + write = bucketDAO.writePart(data, bucketName, blobId, position); + } + int anyNonEmptyValue = 1; + return write.thenReturn(anyNonEmptyValue); + } + + private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) { + if (isDefaultBucket(bucketName)) { + return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk); + } else { + return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk); + } + } + + private boolean isDefaultBucket(BucketName bucketName) { + return bucketName.equals(defaultBucket); + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + if (isDefaultBucket(bucketName)) { + return defaultBucketDAO.deletePosition(blobId) + .then(defaultBucketDAO.deleteParts(blobId)); + } else { + return bucketDAO.deletePosition(bucketName, blobId) + .then(bucketDAO.deleteParts(bucketName, blobId)); + } + } + + @Override + public Mono<Void> deleteBucket(BucketName bucketName) { + Preconditions.checkNotNull(bucketName); + Preconditions.checkArgument(!isDefaultBucket(bucketName), "Deleting the default bucket is forbidden"); + + return bucketDAO.listAll() + .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName)) + .map(Pair::getValue) + .flatMap(blobId -> delete(bucketName, blobId)) + .then(); + } + + private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) { + if (isDefaultBucket(bucketName)) { + return defaultBucketDAO.readPart(blobId, partIndex); + } else { + return bucketDAO.readPart(bucketName, blobId, partIndex); + } + } + + private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) { + if (isDefaultBucket(bucketName)) { + return defaultBucketDAO.selectRowCount(blobId); + } else { + return bucketDAO.selectRowCount(bucketName, blobId); + } + } + + private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { + return selectRowCount(bucketName, blobId) + .single() + .onErrorMap(NoSuchElementException.class, e -> + new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))) + .flatMapMany(rowCount -> Flux.range(0, rowCount) + .concatMap(partIndex -> readPart(bucketName, blobId, partIndex) + .single() + .onErrorMap(NoSuchElementException.class, e -> + new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId.asString(), partIndex))))); + } + + private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) { + int targetSize = byteBuffers + .stream() + .mapToInt(ByteBuffer::remaining) + .sum(); + + return byteBuffers + .stream() + .reduce(ByteBuffer.allocate(targetSize), ByteBuffer::put) + .array(); + } +} diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java new file mode 100644 index 0000000..0e81087 --- /dev/null +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDumbBlobStoreTest.java @@ -0,0 +1,62 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.cassandra; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.DumbBlobStoreContract; +import org.apache.james.blob.api.HashBlobId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class CassandraDumbBlobStoreTest implements DumbBlobStoreContract { + private static final int CHUNK_SIZE = 10240; + private static final int MULTIPLE_CHUNK_SIZE = 3; + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE); + + private DumbBlobStore testee; + private CassandraDefaultBucketDAO defaultBucketDAO; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); + CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf()); + defaultBucketDAO = new CassandraDefaultBucketDAO(cassandra.getConf()); + testee = new CassandraDumbBlobStore( + defaultBucketDAO, + bucketDAO, + CassandraConfiguration.builder() + .blobPartSize(CHUNK_SIZE) + .build(), + blobIdFactory, + BucketName.DEFAULT); + } + + @Override + public DumbBlobStore testee() { + return testee; + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org