JAMES-2623 implement cassandra blobstore with Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a4875cfa Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a4875cfa Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a4875cfa Branch: refs/heads/master Commit: a4875cfa96fdce5ca044e1d6e8adc683658c7b2f Parents: 79e46ad Author: Matthieu Baechler <[email protected]> Authored: Fri Dec 7 14:52:21 2018 +0100 Committer: Antoine Duprat <[email protected]> Committed: Thu Dec 20 14:38:47 2018 +0100 ---------------------------------------------------------------------- .../james/blob/cassandra/CassandraBlobsDAO.java | 184 ++++++++----------- .../cassandra/utils/PipedStreamSubscriber.java | 90 +++++++++ .../src/resources/test/logback-test.xml | 28 +++ 3 files changed, 198 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java index e079176..01c6e8d 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java @@ -24,15 +24,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.Pipe; -import java.util.Optional; +import java.util.Comparator; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.stream.IntStream; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.inject.Inject; @@ -47,24 +44,23 @@ import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.ObjectStoreException; import org.apache.james.blob.cassandra.BlobTable.BlobParts; import org.apache.james.blob.cassandra.utils.DataChunker; -import org.apache.james.util.FluentFutureStream; -import org.apache.james.util.OptionalUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.github.fge.lambdas.Throwing; -import com.github.fge.lambdas.consumers.ConsumerChainer; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class CassandraBlobsDAO implements BlobStore { - private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class); + + private static final int PREFETCH = 16; + private static final int MAX_CONCURRENCY = 2; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement insert; private final PreparedStatement insertPart; @@ -122,71 +118,70 @@ public class CassandraBlobsDAO implements BlobStore { public CompletableFuture<BlobId> save(byte[] data) { Preconditions.checkNotNull(data); - HashBlobId blobId = blobIdFactory.forPayload(data); + return saveAsMono(data).toFuture(); + } + + private Mono<BlobId> saveAsMono(byte[] data) { + BlobId blobId = blobIdFactory.forPayload(data); return saveBlobParts(data, blobId) - .thenCompose(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk)) - .thenApply(any -> blobId); + .flatMap(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk)); } - private CompletableFuture<Integer> saveBlobParts(byte[] data, HashBlobId blobId) { - return FluentFutureStream.of( - dataChunker.chunk(data, configuration.getBlobPartSize()) - .map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) - .thenApply(partId -> Pair.of(pair.getKey(), partId)))) - .completableFuture() - .thenApply(stream -> - getLastOfStream(stream) - .map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1) - .orElse(0)); + private Mono<Integer> saveBlobParts(byte[] data, BlobId blobId) { + Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize()); + return Flux.fromStream(chunks) + .publishOn(Schedulers.elastic(), PREFETCH) + .flatMap(pair -> writePart(pair.getValue(), blobId, getChunkNum(pair))) + .collect(Collectors.maxBy(Comparator.comparingInt(x -> x))) + .flatMap(Mono::justOrEmpty) + .map(this::numToCount) + .defaultIfEmpty(0); } - private static <T> Optional<T> getLastOfStream(Stream<T> stream) { - return stream.reduce((first, second) -> second); + private int numToCount(int number) { + return number + 1; } - private CompletableFuture<Void> writePart(ByteBuffer data, HashBlobId blobId, int position) { - return cassandraAsyncExecutor.executeVoid( + private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) { + return pair.getKey(); + } + + private Mono<Integer> writePart(ByteBuffer data, BlobId blobId, int position) { + return cassandraAsyncExecutor.executeVoidReactor( insertPart.bind() .setString(BlobTable.ID, blobId.asString()) .setInt(BlobParts.CHUNK_NUMBER, position) - .setBytes(BlobParts.DATA, data)); + .setBytes(BlobParts.DATA, data)) + .then(Mono.just(position)); } - private CompletableFuture<Void> saveBlobPartsReferences(HashBlobId blobId, int numberOfChunk) { - return cassandraAsyncExecutor.executeVoid(insert.bind() - .setString(BlobTable.ID, blobId.asString()) - .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk)); + private Mono<BlobId> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) { + return cassandraAsyncExecutor.executeVoidReactor( + insert.bind() + .setString(BlobTable.ID, blobId.asString()) + .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk)) + .then(Mono.just(blobId)); } @Override public CompletableFuture<byte[]> readBytes(BlobId blobId) { - CompletableFuture<Row> futureRow = cassandraAsyncExecutor - .executeSingleRow( - select.bind() - .setString(BlobTable.ID, blobId.asString())) - .thenApply(x -> x.orElseThrow(() -> new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId)))); - return toDataParts(futureRow.join(), blobId) - .thenApply(this::concatenateDataParts); - } - - private CompletableFuture<Stream<BlobPart>> toDataParts(Row blobRow, BlobId blobId) { - int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK); - return FluentFutureStream.of( - IntStream.range(0, numOfChunk) - .mapToObj(position -> readPart(blobId, position))) - .completableFuture(); + try { + return readBlobParts(blobId) + .collectList() + .map(parts -> Bytes.concat(parts.toArray(new byte[0][]))) + .toFuture(); + } catch (ObjectStoreException e) { + CompletableFuture<byte[]> error = new CompletableFuture<>(); + error.completeExceptionally(e); + return error; + } } - private byte[] concatenateDataParts(Stream<BlobPart> blobParts) { - ImmutableList<byte[]> parts = blobParts - .map(blobPart -> OptionalUtils.executeIfEmpty( - blobPart.row, - () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position))) - .flatMap(OptionalUtils::toStream) - .map(this::rowToData) - .collect(Guavate.toImmutableList()); - - return Bytes.concat(parts.toArray(new byte[parts.size()][])); + private Mono<Integer> selectRowCount(BlobId blobId) { + return cassandraAsyncExecutor.executeSingleRowReactor( + select.bind() + .setString(BlobTable.ID, blobId.asString())) + .map(row -> row.getInt(BlobTable.NUMBER_OF_CHUNK)); } private byte[] rowToData(Row row) { @@ -195,60 +190,41 @@ public class CassandraBlobsDAO implements BlobStore { return data; } - private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) { - return cassandraAsyncExecutor.executeSingleRow( + private Mono<byte[]> readPart(BlobId blobId, int position) { + return cassandraAsyncExecutor.executeSingleRowReactor( selectPart.bind() .setString(BlobTable.ID, blobId.asString()) .setInt(BlobParts.CHUNK_NUMBER, position)) - .thenApply(row -> new BlobPart(blobId, position, row)); - } - - private static class BlobPart { - private final BlobId blobId; - private final int position; - private final Optional<Row> row; - - public BlobPart(BlobId blobId, int position, Optional<Row> row) { - Preconditions.checkNotNull(blobId); - Preconditions.checkArgument(position >= 0, "position need to be positive"); - this.blobId = blobId; - this.position = position; - this.row = row; - } + .map(this::rowToData) + .switchIfEmpty(Mono.error(new IllegalStateException( + String.format("Missing blob part for blobId %s and position %d", blobId, position)))); } @Override public InputStream read(BlobId blobId) { - try { - Pipe pipe = Pipe.open(); - ConsumerChainer<ByteBuffer> consumer = Throwing.consumer( - bytes -> { - try (Pipe.SinkChannel sink = pipe.sink()) { - sink.write(bytes); - } - } - ); - readBytes(blobId) - .thenApply(ByteBuffer::wrap) - .thenAccept(consumer.sneakyThrow()); - return Channels.newInputStream(pipe.source()); - } catch (CompletionException e) { - if (e.getCause() instanceof ObjectStoreException) { - throw (ObjectStoreException)(e.getCause()); - } - throw new RuntimeException(e); - } catch (IOException cause) { - throw new ObjectStoreException( - "Failed to convert CompletableFuture<byte[]> to InputStream", - cause); - } + PipedInputStream pipedInputStream = new PipedInputStream(); + readBlobParts(blobId) + .subscribe(new PipedStreamSubscriber(pipedInputStream)); + return pipedInputStream; + } + + private Flux<byte[]> readBlobParts(BlobId blobId) { + Integer rowCount = selectRowCount(blobId) + .publishOn(Schedulers.elastic()) + .switchIfEmpty(Mono.error( + new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId)))) + .block(); + return Flux.range(0, rowCount) + .publishOn(Schedulers.elastic(), PREFETCH) + .flatMapSequential(partIndex -> readPart(blobId, partIndex), MAX_CONCURRENCY, PREFETCH); } @Override public CompletableFuture<BlobId> save(InputStream data) { Preconditions.checkNotNull(data); - return CompletableFuture - .supplyAsync(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow()) - .thenCompose(this::save); + return Mono.fromSupplier(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow()) + .publishOn(Schedulers.elastic()) + .flatMap(this::saveAsMono) + .toFuture(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java new file mode 100644 index 0000000..801d4c9 --- /dev/null +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java @@ -0,0 +1,90 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.cassandra.utils; + +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.UncheckedIOException; + +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import reactor.core.publisher.BaseSubscriber; + +public class PipedStreamSubscriber extends BaseSubscriber<byte[]> { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final PipedInputStream in; + private PipedOutputStream out; + + public PipedStreamSubscriber(PipedInputStream in) { + Preconditions.checkNotNull(in, "The input stream must not be null"); + this.in = in; + } + + @Override + protected void hookOnSubscribe(Subscription subscription) { + super.hookOnSubscribe(subscription); + try { + this.out = new PipedOutputStream(in); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected void hookOnNext(byte[] payload) { + try { + out.write(payload); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected void hookOnComplete() { + close(); + } + + @Override + protected void hookOnError(Throwable error) { + logger.error("Failure processing stream", error); + close(); + } + + @Override + protected void hookOnCancel() { + close(); + } + + private void close() { + try { + if (out != null) { + out.close(); + } + } catch (IOException ignored) { + //ignored + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/resources/test/logback-test.xml ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/resources/test/logback-test.xml b/server/blob/blob-cassandra/src/resources/test/logback-test.xml new file mode 100644 index 0000000..b02f0d8 --- /dev/null +++ b/server/blob/blob-cassandra/src/resources/test/logback-test.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + + <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"> + <resetJUL>true</resetJUL> + </contextListener> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern> + <immediateFlush>false</immediateFlush> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="CONSOLE" /> + </root> + + + <logger name="org.apache.james" level="WARN" > + <appender-ref ref="CONSOLE" /> + </logger> + + <logger name="org.apache.james.backends.cassandra.DockerCassandraRule" level="WARN" > + <appender-ref ref="CONSOLE" /> + </logger> + +</configuration> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
