This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a683036ae8c0706916326024b6bd18cbc0eef931 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Fri Aug 2 10:59:12 2019 +0200 JAMES-2851 Replace CassandraBlobStore's PipedStreamSubscriber by ReactorUtils.toInputStream --- .../james/blob/cassandra/CassandraBlobStore.java | 8 +- .../cassandra/utils/PipedStreamSubscriber.java | 89 ---------------------- 2 files changed, 2 insertions(+), 95 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java index 9cdb7c7..171f215 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java @@ -37,8 +37,7 @@ import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.blob.cassandra.utils.DataChunker; -import org.apache.james.blob.cassandra.utils.PipedInputStreamHandlingError; -import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber; +import org.apache.james.util.ReactorUtils; import com.datastax.driver.core.Session; import com.google.common.annotations.VisibleForTesting; @@ -120,10 +119,7 @@ public class CassandraBlobStore implements BlobStore { @Override public InputStream read(BucketName bucketName, BlobId blobId) { - PipedInputStreamHandlingError pipedInputStream = new PipedInputStreamHandlingError(); - readBlobParts(bucketName, blobId) - .subscribe(new PipedStreamSubscriber(pipedInputStream)); - return pipedInputStream; + return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId)); } @Override diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java deleted file mode 100644 index f9fcade..0000000 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java +++ /dev/null @@ -1,89 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.blob.cassandra.utils; - -import java.io.IOException; -import java.io.UncheckedIOException; - -import org.reactivestreams.Subscription; - -import com.google.common.base.Preconditions; - -import reactor.core.publisher.BaseSubscriber; - -public class PipedStreamSubscriber extends BaseSubscriber<byte[]> { - private final PipedInputStreamHandlingError in; - private PipedOutputStreamHandlingError out; - - public PipedStreamSubscriber(PipedInputStreamHandlingError in) { - Preconditions.checkNotNull(in, "The input stream must not be null"); - this.in = in; - } - - @Override - protected void hookOnSubscribe(Subscription subscription) { - super.hookOnSubscribe(subscription); - try { - this.out = new PipedOutputStreamHandlingError(in); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - protected void hookOnNext(byte[] payload) { - try { - out.write(payload); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - protected void hookOnComplete() { - close(); - } - - @Override - protected void hookOnError(Throwable error) { - if (error instanceof RuntimeException) { - out.propagateError((RuntimeException) error); - } else { - out.propagateError(new RuntimeException(error)); - } - - close(); - } - - @Override - protected void hookOnCancel() { - close(); - } - - private void close() { - try { - if (out != null) { - out.close(); - } - } catch (IOException ignored) { - //ignored - } - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
