This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3c8d13c GCS offload support(2): replace `s3client` api with `jclouds` related api (#2065) 3c8d13c is described below commit 3c8d13c0546b7fed0100d499bd1841f7ce2a127e Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Fri Jul 20 13:27:05 2018 +0800 GCS offload support(2): replace `s3client` api with `jclouds` related api (#2065) This is the second part to support `Google Cloud Storage` offload. It aims to replace "s3 client" api with "jclouds" api, and make sure unit test and integration test passed. There will be a following change to add `Google Cloud Storage` support and related test. change: replace `s3client` api with `jclouds` related api in `S3ManagedLedgerOffloader` Master Issue: #2067 --- conf/broker.conf | 2 +- distribution/server/src/assemble/LICENSE.bin.txt | 10 +- jclouds-shaded/pom.xml | 105 +++++++ pom.xml | 2 + pulsar-broker/pom.xml | 8 +- .../org/apache/pulsar/broker/PulsarService.java | 6 +- ...pl.java => BlobStoreBackedInputStreamImpl.java} | 63 ++-- ...mpl.java => BlobStoreBackedReadHandleImpl.java} | 49 ++- ...r.java => BlobStoreManagedLedgerOffloader.java} | 220 +++++++++----- ...st.java => BlobStoreBackedInputStreamTest.java} | 114 ++++--- .../{S3TestBase.java => BlobStoreTestBase.java} | 36 ++- .../org/apache/pulsar/broker/offload/S3Mock.java | 334 --------------------- ...va => BlobStoreManagedLedgerOffloaderTest.java} | 190 ++++++------ ...luster-2-bookie-1-broker-unstarted-with-s3.yaml | 4 +- 14 files changed, 518 insertions(+), 625 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 2e40d82..96433aa 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -486,7 +486,7 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe ### --- Ledger Offloading --- ### -# Driver to use to offload old data to long term storage (Possible values: S3) +# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage) managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 33b9748..85a97e5 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -334,10 +334,10 @@ The Apache Software License, Version 2.0 - io.swagger-swagger-annotations-1.5.3.jar - io.swagger-swagger-core-1.5.3.jar - io.swagger-swagger-models-1.5.3.jar - * DataSketches + * DataSketches - com.yahoo.datasketches-memory-0.8.3.jar - com.yahoo.datasketches-sketches-core-0.8.3.jar - * Apache Commons + * Apache Commons - commons-beanutils-commons-beanutils-1.7.0.jar - commons-beanutils-commons-beanutils-core-1.8.0.jar - commons-cli-commons-cli-1.2.jar @@ -461,6 +461,12 @@ The Apache Software License, Version 2.0 - org.xerial.snappy-snappy-java-1.1.1.3.jar * Flatbuffers Java - com.google.flatbuffers-flatbuffers-java-1.9.0.jar + * Apache Jclouds + - org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar + * Google Guice Core Library + - com.google.inject.guice-3.0.jar + - com.google.inject.extensions:guice-multibindings-3.0.jar + - com.google.inject.extensions:guice-assistedinject-3.0.jar BSD 3-clause "New" or "Revised" License diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml new file mode 100644 index 0000000..8aa1786 --- /dev/null +++ b/jclouds-shaded/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>jclouds-shaded</artifactId> + <name>Apache Pulsar :: Jclouds shaded</name> + + <dependencies> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.jclouds</groupId> + <artifactId>jclouds-allblobstore</artifactId> + <version>2.2.0-SNAPSHOT</version> + </dependency> + </dependencies> + + <repositories> + <repository> + <id>jclouds-snapshots</id> + <url>https://repository.apache.org/content/repositories/snapshots</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <minimizeJar>false</minimizeJar> + + <artifactSet> + <includes> + <include>com.google.code.gson:gson</include> + <include>com.google.guava:guava</include> + <include>org.apache.jclouds:*</include> + <include>org.apache.jclouds.api:*</include> + <include>org.apache.jclouds.common:*</include> + <include>org.apache.jclouds.provider:*</include> + <include>com.google.inject.extensions:guice-assistedinject</include> + <include>com.google.inject:guice</include> + <include>com.google.inject.extensions:guice-multibindings</include> + </includes> + </artifactSet> + + <relocations> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/pom.xml b/pom.xml index 263d362..6369a10 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,8 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-zookeeper</module> <module>pulsar-log4j2-appender</module> <module>protobuf-shaded</module> + <!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 --> + <module>jclouds-shaded</module> <!-- functions-related modules --> <module>pulsar-functions</module> diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index d505c7a..ac264bd 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -127,7 +127,7 @@ <dependency> <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> + <artifactId>aws-java-sdk-core</artifactId> </dependency> <!-- functions related dependencies (begin) --> @@ -273,6 +273,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>jclouds-shaded</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2a341af..c0aa9d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -61,7 +61,7 @@ import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -657,8 +657,8 @@ public class PulsarService implements AutoCloseable { public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) throws PulsarServerException { if (conf.getManagedLedgerOffloadDriver() != null - && conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) { - return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); + && BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) { + return BlobStoreManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); } else { return NullLedgerOffloader.INSTANCE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java similarity index 68% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java index e55e61b..19fac59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java @@ -18,27 +18,22 @@ */ package org.apache.pulsar.broker.offload.impl; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; - import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; - -import java.io.InputStream; import java.io.IOException; - +import java.io.InputStream; import org.apache.pulsar.broker.offload.BackedInputStream; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; - +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.GetOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BackedInputStreamImpl extends BackedInputStream { - private static final Logger log = LoggerFactory.getLogger(S3BackedInputStreamImpl.class); +public class BlobStoreBackedInputStreamImpl extends BackedInputStream { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class); - private final AmazonS3 s3client; + private final BlobStore blobStore; private final String bucket; private final String key; private final VersionCheck versionCheck; @@ -50,10 +45,10 @@ public class S3BackedInputStreamImpl extends BackedInputStream { private long bufferOffsetStart; private long bufferOffsetEnd; - public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key, - VersionCheck versionCheck, - long objectLen, int bufferSize) { - this.s3client = s3client; + public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key, + VersionCheck versionCheck, + long objectLen, int bufferSize) { + this.blobStore = blobStore; this.bucket = bucket; this.key = key; this.versionCheck = versionCheck; @@ -76,26 +71,24 @@ public class S3BackedInputStreamImpl extends BackedInputStream { long startRange = cursor; long endRange = Math.min(cursor + bufferSize - 1, objectLen - 1); - GetObjectRequest req = new GetObjectRequest(bucket, key) - .withRange(startRange, endRange); - log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key); - try (S3Object obj = s3client.getObject(req)) { - versionCheck.check(key, obj.getObjectMetadata()); - - Long[] range = obj.getObjectMetadata().getContentRange(); - long bytesRead = range[1] - range[0] + 1; - buffer.clear(); - bufferOffsetStart = range[0]; - bufferOffsetEnd = range[1]; - InputStream s = obj.getObjectContent(); - int bytesToCopy = (int)bytesRead; - while (bytesToCopy > 0) { - bytesToCopy -= buffer.writeBytes(s, bytesToCopy); + try { + Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange)); + versionCheck.check(key, blob); + + try (InputStream stream = blob.getPayload().openStream()) { + buffer.clear(); + bufferOffsetStart = startRange; + bufferOffsetEnd = endRange; + long bytesRead = endRange - startRange + 1; + int bytesToCopy = (int) bytesRead; + while (bytesToCopy > 0) { + bytesToCopy -= buffer.writeBytes(stream, bytesToCopy); + } + cursor += buffer.readableBytes(); } - cursor += buffer.readableBytes(); - } catch (AmazonClientException e) { - throw new IOException("Error reading from S3", e); + } catch (Throwable e) { + throw new IOException("Error reading from BlobStore", e); } } return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java similarity index 82% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java index 08b5ea6..36b382b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java @@ -19,13 +19,8 @@ package org.apache.pulsar.broker.offload.impl; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; - import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; - import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; @@ -33,7 +28,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -42,18 +36,18 @@ import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; - +import org.apache.pulsar.broker.offload.BackedInputStream; import org.apache.pulsar.broker.offload.OffloadIndexBlock; import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; import org.apache.pulsar.broker.offload.OffloadIndexEntry; -import org.apache.pulsar.broker.offload.BackedInputStream; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; - +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BackedReadHandleImpl implements ReadHandle { - private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class); +public class BlobStoreBackedReadHandleImpl implements ReadHandle { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); private final long ledgerId; private final OffloadIndexBlock index; @@ -61,9 +55,9 @@ public class S3BackedReadHandleImpl implements ReadHandle { private final DataInputStream dataStream; private final ExecutorService executor; - private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, - BackedInputStream inputStream, - ExecutorService executor) { + private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, + BackedInputStream inputStream, + ExecutorService executor) { this.ledgerId = ledgerId; this.index = index; this.inputStream = inputStream; @@ -189,22 +183,19 @@ public class S3BackedReadHandleImpl implements ReadHandle { } public static ReadHandle open(ScheduledExecutorService executor, - AmazonS3 s3client, String bucket, String key, String indexKey, + BlobStore blobStore, String bucket, String key, String indexKey, VersionCheck versionCheck, long ledgerId, int readBufferSize) throws AmazonClientException, IOException { - GetObjectRequest req = new GetObjectRequest(bucket, indexKey); - try (S3Object obj = s3client.getObject(req)) { - versionCheck.check(indexKey, obj.getObjectMetadata()); - - OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); - OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); - - BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, - versionCheck, - index.getDataObjectLength(), - readBufferSize); - return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); - } + Blob blob = blobStore.getBlob(bucket, indexKey); + versionCheck.check(indexKey, blob); + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); + OffloadIndexBlock index = indexBuilder.fromStream(blob.getPayload().openStream()); + + BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, + versionCheck, + index.getDataObjectLength(), + readBufferSize); + return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java similarity index 55% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java index ec74d27..528ba28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java @@ -18,24 +18,17 @@ */ package org.apache.pulsar.broker.offload.impl; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.io.IOException; -import java.util.LinkedList; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.ReadHandle; @@ -47,21 +40,48 @@ import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream; import org.apache.pulsar.broker.offload.OffloadIndexBlock; import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; +import org.jclouds.Constants; +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobBuilder; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.domain.Location; +import org.jclouds.domain.LocationBuilder; +import org.jclouds.domain.LocationScope; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.jclouds.s3.reference.S3Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3ManagedLedgerOffloader implements LedgerOffloader { - private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class); +public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { + private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class); - public static final String DRIVER_NAME = "S3"; + public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion"; static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion"; static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha"; static final String CURRENT_VERSION = String.valueOf(1); - private final VersionCheck VERSION_CHECK = (key, metadata) -> { - String version = metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY); + public static boolean driverSupported(String driver) { + return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver)); + } + + private static void addVersionInfo(BlobBuilder blobBuilder) { + blobBuilder.userMetadata(ImmutableMap.of( + METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION, + METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString(), + METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha())); + } + + private final VersionCheck VERSION_CHECK = (key, blob) -> { + // NOTE all metadata in jclouds comes out as lowercase, in an effort to normalize the providers + String version = blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase()); if (version == null || !version.equals(CURRENT_VERSION)) { throw new IOException(String.format("Invalid object version %s for %s, expect %s", version, key, CURRENT_VERSION)); @@ -69,15 +89,21 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { }; private final OrderedScheduler scheduler; - private final AmazonS3 s3client; + + // container in jclouds private final String bucket; // max block size for each data block. private int maxBlockSize; private final int readBufferSize; - public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, - OrderedScheduler scheduler) + private BlobStoreContext context; + private BlobStore blobStore; + Location location = null; + + public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf, + OrderedScheduler scheduler) throws PulsarServerException { + String driver = conf.getManagedLedgerOffloadDriver(); String region = conf.getS3ManagedLedgerOffloadRegion(); String bucket = conf.getS3ManagedLedgerOffloadBucket(); String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); @@ -96,23 +122,66 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB"); } - AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region); + } + + // build context for jclouds BlobStoreContext + BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler, + int maxBlockSize, int readBufferSize, String endpoint, String region) { + this.scheduler = scheduler; + this.readBufferSize = readBufferSize; + + this.bucket = container; + this.maxBlockSize = maxBlockSize; + + Properties overrides = new Properties(); + // This property controls the number of parts being uploaded in parallel. + overrides.setProperty("jclouds.mpu.parallel.degree", "1"); + overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); + overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); + overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); + + ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); + + AWSCredentials credentials = null; + try { + DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); + credentials = creds.getCredentials(); + } catch (Exception e) { + log.error("Exception when get credentials for s3 ", e); + } + + String id = "accesskey"; + String key = "secretkey"; + if (credentials != null) { + id = credentials.getAWSAccessKeyId(); + key = credentials.getAWSSecretKey(); + } + contextBuilder.credentials(id, key); + if (!Strings.isNullOrEmpty(endpoint)) { - builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region)); - builder.setPathStyleAccessEnabled(true); - } else { - builder.setRegion(region); + contextBuilder.endpoint(endpoint); + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + } + if (!Strings.isNullOrEmpty(region)) { + this.location = new LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build(); } - return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize); + + log.info("Constructor driver: {}, host: {}, container: {}, region: {} ", driver, endpoint, bucket, region); + + contextBuilder.overrides(overrides); + this.context = contextBuilder.buildView(BlobStoreContext.class); + this.blobStore = context.getBlobStore(); } - S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, OrderedScheduler scheduler, - int maxBlockSize, int readBufferSize) { - this.s3client = s3client; - this.bucket = bucket; + // build context for jclouds BlobStoreContext + BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler, + int maxBlockSize, int readBufferSize) { this.scheduler = scheduler; - this.maxBlockSize = maxBlockSize; this.readBufferSize = readBufferSize; + this.bucket = container; + this.maxBlockSize = maxBlockSize; + this.blobStore = blobStore; } static String dataBlockOffloadKey(long ledgerId, UUID uuid) { @@ -141,15 +210,15 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); - ObjectMetadata dataMetadata = new ObjectMetadata(); - addVersionInfo(dataMetadata); - - InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata); - InitiateMultipartUploadResult dataBlockRes = null; + MultipartUpload mpu = null; + List<MultipartPart> parts = Lists.newArrayList(); // init multi part upload for data block. try { - dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq); + BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey); + addVersionInfo(blobBuilder); + Blob blob = blobBuilder.build(); + mpu = blobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); } catch (Throwable t) { promise.completeExceptionally(t); return; @@ -161,7 +230,6 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { long startEntry = 0; int partId = 1; long entryBytesWritten = 0; - List<PartETag> etags = new LinkedList<>(); while (startEntry <= readHandle.getLastAddConfirmed()) { int blockSize = BlockAwareSegmentInputStreamImpl .calculateBlockSize(maxBlockSize, readHandle, startEntry, entryBytesWritten); @@ -169,15 +237,13 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( readHandle, startEntry, blockSize)) { - UploadPartResult uploadRes = s3client.uploadPart( - new UploadPartRequest() - .withBucketName(bucket) - .withKey(dataBlockKey) - .withUploadId(dataBlockRes.getUploadId()) - .withInputStream(blockStream) - .withPartSize(blockSize) - .withPartNumber(partId)); - etags.add(uploadRes.getPartETag()); + Payload partPayload = Payloads.newInputStreamPayload(blockStream); + partPayload.getContentMetadata().setContentLength((long)blockSize); + partPayload.getContentMetadata().setContentType("application/octet-stream"); + parts.add(blobStore.uploadMultipartPart(mpu, partId, partPayload)); + log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", + bucket, dataBlockKey, partId, mpu.id()); + indexBuilder.addBlock(startEntry, partId, blockSize); if (blockStream.getEndEntryId() != -1) { @@ -193,17 +259,16 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { dataObjectLength += blockSize; } - s3client.completeMultipartUpload(new CompleteMultipartUploadRequest() - .withBucketName(bucket).withKey(dataBlockKey) - .withUploadId(dataBlockRes.getUploadId()) - .withPartETags(etags)); + blobStore.completeMultipartUpload(mpu, parts); + mpu = null; } catch (Throwable t) { try { - s3client.abortMultipartUpload( - new AbortMultipartUploadRequest(bucket, dataBlockKey, dataBlockRes.getUploadId())); + if (mpu != null) { + blobStore.abortMultipartUpload(mpu); + } } catch (Throwable throwable) { log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, dataBlockRes.getUploadId(), throwable); + bucket, dataBlockKey, mpu.id(), throwable); } promise.completeExceptionally(t); return; @@ -213,19 +278,22 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) { // write the index block - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(indexStream.getStreamSize()); - addVersionInfo(metadata); - - s3client.putObject(new PutObjectRequest( - bucket, - indexBlockKey, - indexStream, - metadata)); + BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey); + addVersionInfo(blobBuilder); + Payload indexPayload = Payloads.newInputStreamPayload(indexStream); + indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize()); + indexPayload.getContentMetadata().setContentType("application/octet-stream"); + + Blob blob = blobBuilder + .payload(indexPayload) + .contentLength((long)indexStream.getStreamSize()) + .build(); + + blobStore.putBlob(bucket, blob); promise.complete(null); } catch (Throwable t) { try { - s3client.deleteObject(bucket, dataBlockKey); + blobStore.removeBlob(bucket, dataBlockKey); } catch (Throwable throwable) { log.error("Failed deleteObject in bucket - {} with key - {}.", bucket, dataBlockKey, throwable); @@ -244,35 +312,31 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { String indexKey = indexBlockOffloadKey(ledgerId, uid); scheduler.chooseThread(ledgerId).submit(() -> { try { - promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), - s3client, + promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), + blobStore, bucket, key, indexKey, VERSION_CHECK, ledgerId, readBufferSize)); } catch (Throwable t) { + log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); } }); return promise; } - private static void addVersionInfo(ObjectMetadata metadata) { - metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION); - metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY, - PulsarBrokerVersionStringUtils.getNormalizedVersionString()); - metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha()); - } + @Override public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.chooseThread(ledgerId).submit(() -> { try { - s3client.deleteObjects(new DeleteObjectsRequest(bucket) - .withKeys(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); + blobStore.removeBlobs(bucket, + ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { - log.error("Failed delete s3 Object ", t); + log.error("Failed delete Blob", t); promise.completeExceptionally(t); } }); @@ -281,7 +345,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { } public interface VersionCheck { - void check(String key, ObjectMetadata md) throws IOException; + void check(String key, Blob blob) throws IOException; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java similarity index 61% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java index 45bca52..bde4dde 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java @@ -18,29 +18,35 @@ */ package org.apache.pulsar.broker.offload; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.spy; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ObjectMetadata; - -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Random; - import lombok.extern.slf4j.Slf4j; - -import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl; - +import org.apache.pulsar.broker.offload.impl.BlobStoreBackedInputStreamImpl; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.GetOptions; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @Slf4j -class S3BackedInputStreamTest extends S3TestBase { +class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class); + class RandomInputStream extends InputStream { final Random r; int bytesRemaining; @@ -85,16 +91,21 @@ class S3BackedInputStreamTest extends S3TestBase { @Test public void testReadingFullObject() throws Exception { - String objectKey = "foobar"; + String objectKey = "testReadingFull"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); RandomInputStream toCompare = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatch(toTest, toCompare); @@ -102,24 +113,29 @@ class S3BackedInputStreamTest extends S3TestBase { @Test public void testReadingFullObjectByBytes() throws Exception { - String objectKey = "foobar"; + String objectKey = "testReadingFull2"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); RandomInputStream toCompare = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatchByBytes(toTest, toCompare); } @Test(expectedExceptions = IOException.class) - public void testErrorOnS3Read() throws Exception { - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", + public void testErrorOnRead() throws Exception { + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist", (key, md) -> {}, 1234, 1000); toTest.read(); @@ -128,7 +144,7 @@ class S3BackedInputStreamTest extends S3TestBase { @Test public void testSeek() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeek"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); @@ -141,11 +157,16 @@ class S3BackedInputStreamTest extends S3TestBase { seeks.put(seek, stream); } - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) { @@ -156,16 +177,23 @@ class S3BackedInputStreamTest extends S3TestBase { @Test public void testSeekWithinCurrent() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeekWithinCurrent"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + + //BlobStore spiedBlobStore = spy(blobStore); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); - AmazonS3 spiedClient = spy(s3client); - BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -193,20 +221,26 @@ class S3BackedInputStreamTest extends S3TestBase { Assert.assertEquals(thirdSeek.read(), toTest.read()); } - verify(spiedClient, times(1)).getObject(anyObject()); + verify(spiedBlobStore, times(1)) + .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), Matchers.<GetOptions>anyObject()); } @Test public void testSeekForward() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeekForward"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java similarity index 53% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java index f2ea6c4..d1474f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java @@ -18,27 +18,39 @@ */ package org.apache.pulsar.broker.offload; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; - +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -public class S3TestBase { +public class BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreTestBase.class); + public final static String BUCKET = "pulsar-unittest"; - protected AmazonS3 s3client = null; + protected BlobStoreContext context = null; + protected BlobStore blobStore = null; @BeforeMethod public void start() throws Exception { - if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) { - // To use this, ~/.aws must be configured with credentials and a default region - s3client = AmazonS3ClientBuilder.standard().build(); - } else { - s3client = new S3Mock(); + context = ContextBuilder.newBuilder("transient").build(BlobStoreContext.class); + blobStore = context.getBlobStore(); + boolean create = blobStore.createContainerInLocation(null, BUCKET); + + log.debug("TestBase Create Bucket: {}, in blobStore, result: {}", BUCKET, create); + } + + @AfterMethod + public void tearDown() { + if (blobStore != null) { + blobStore.deleteContainer(BUCKET); } - if (!s3client.doesBucketExistV2(BUCKET)) { - s3client.createBucket(BUCKET); + if (context != null) { + context.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java deleted file mode 100644 index 4bfc140..0000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.offload; - -import com.amazonaws.services.s3.AbstractAmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.Bucket; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.CopyObjectResult; -import com.amazonaws.services.s3.model.DeleteObjectRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; -import com.amazonaws.services.s3.model.GetObjectMetadataRequest; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; - -import com.google.common.collect.ComparisonChain; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.stream.Collectors; - -/** - * Minimal mock for amazon client. - * If making any changes, validate they behave the same as S3 by running all S3 tests with -DtestRealAWS=true - */ -class S3Mock extends AbstractAmazonS3 { - @Override - public boolean doesBucketExistV2(String bucketName) { - return buckets.containsKey(bucketName); - } - - @Override - public boolean doesObjectExist(String bucketName, String objectName) { - return buckets.containsKey(bucketName) && getBucket(bucketName).hasObject(objectName); - } - - @Override - public Bucket createBucket(String bucketName) { - return buckets.computeIfAbsent(bucketName, (k) -> new MockBucket(k)); - } - - private MockBucket getBucket(String bucketName) throws AmazonS3Exception { - MockBucket bucket = buckets.get(bucketName); - if (bucket != null) { - return bucket; - } else { - throw new AmazonS3Exception("NoSuchBucket: Bucket doesn't exist"); - } - } - - @Override - public PutObjectResult putObject(PutObjectRequest putObjectRequest) - throws AmazonS3Exception { - return getBucket(putObjectRequest.getBucketName()).putObject(putObjectRequest); - } - - @Override - public S3Object getObject(GetObjectRequest getObjectRequest) { - return getBucket(getObjectRequest.getBucketName()).getObject(getObjectRequest); - } - - @Override - public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) - throws AmazonS3Exception { - return getBucket(getObjectMetadataRequest.getBucketName()).getObjectMetadata(getObjectMetadataRequest); - } - - @Override - public void deleteObject(DeleteObjectRequest deleteObjectRequest) - throws AmazonS3Exception { - getBucket(deleteObjectRequest.getBucketName()).deleteObject(deleteObjectRequest.getKey()); - } - - @Override - public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) - throws AmazonS3Exception { - List<DeleteObjectsResult.DeletedObject> results = deleteObjectsRequest.getKeys().stream().map((k) -> { - getBucket(deleteObjectsRequest.getBucketName()).deleteObject(k.getKey()); - DeleteObjectsResult.DeletedObject res = new DeleteObjectsResult.DeletedObject(); - res.setKey(k.getKey()); - return res; - }).collect(Collectors.toList()); - return new DeleteObjectsResult(results); - } - - @Override - public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) - throws AmazonS3Exception { - S3Object from = getObject(new GetObjectRequest(copyObjectRequest.getSourceBucketName(), - copyObjectRequest.getSourceKey())); - ObjectMetadata newMetadata = copyObjectRequest.getNewObjectMetadata(); - if (newMetadata == null) { - newMetadata = from.getObjectMetadata(); - } - newMetadata.setContentLength(from.getObjectMetadata().getContentLength()); - putObject(new PutObjectRequest(copyObjectRequest.getDestinationBucketName(), - copyObjectRequest.getDestinationKey(), - from.getObjectContent(), - newMetadata)); - return new CopyObjectResult(); - } - - @Override - public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).initMultipart(request); - } - - @Override - public UploadPartResult uploadPart(UploadPartRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).uploadPart(request); - } - - @Override - public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).completeMultipart(request); - } - - ConcurrentHashMap<String, MockBucket> buckets = new ConcurrentHashMap<>(); - - static class MockBucket extends Bucket { - ConcurrentHashMap<String, MockObject> objects = new ConcurrentHashMap<>(); - ConcurrentHashMap<String, MockMultipart> inprogressMultipart = new ConcurrentHashMap<>(); - - MockBucket(String name) { - super(name); - } - - boolean hasObject(String key) { - return objects.containsKey(key); - } - - PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonS3Exception { - byte[] bytes = streamToBytes(putObjectRequest.getInputStream(), - (int)putObjectRequest.getMetadata().getContentLength()); - objects.put(putObjectRequest.getKey(), - new MockObject(putObjectRequest.getMetadata(), bytes)); - return new PutObjectResult(); - } - - S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonS3Exception { - MockObject obj = objects.get(getObjectRequest.getKey()); - if (obj == null) { - throw new AmazonS3Exception("Object doesn't exist"); - } - - S3Object s3obj = new S3Object(); - s3obj.setBucketName(getObjectRequest.getBucketName()); - s3obj.setKey(getObjectRequest.getKey()); - - if (getObjectRequest.getRange() != null) { - long[] range = getObjectRequest.getRange(); - int size = (int)(range[1] - range[0] + 1); - ObjectMetadata metadata = obj.metadata.clone(); - metadata.setHeader("Content-Range", - String.format("bytes %d-%d/%d", - range[0], range[1], size)); - s3obj.setObjectMetadata(metadata); - s3obj.setObjectContent(new ByteArrayInputStream(obj.data, (int)range[0], size)); - return s3obj; - } else { - s3obj.setObjectMetadata(obj.metadata); - s3obj.setObjectContent(new ByteArrayInputStream(obj.data)); - return s3obj; - } - } - - void deleteObject(String key) { - objects.remove(key); - } - - ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) - throws AmazonS3Exception { - MockObject obj = objects.get(getObjectMetadataRequest.getKey()); - if (obj == null) { - throw new AmazonS3Exception("Object doesn't exist"); - } - return obj.metadata; - } - - InitiateMultipartUploadResult initMultipart(InitiateMultipartUploadRequest request) - throws AmazonS3Exception { - String uploadId = UUID.randomUUID().toString(); - inprogressMultipart.put(uploadId, new MockMultipart(request.getKey(), - request.getObjectMetadata())); - InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); - result.setBucketName(request.getBucketName()); - result.setKey(request.getKey()); - result.setUploadId(uploadId); - return result; - } - - MockMultipart getMultipart(String uploadId, String key) throws AmazonS3Exception { - MockMultipart multi = inprogressMultipart.get(uploadId); - if (multi == null) { - throw new AmazonS3Exception("No such upload " + uploadId); - } - if (!multi.key.equals(key)) { - throw new AmazonS3Exception("Wrong key for upload " + uploadId - + ", expected " + key - + ", got " + multi.key); - } - return multi; - } - - UploadPartResult uploadPart(UploadPartRequest request) - throws AmazonS3Exception { - MockMultipart multi = getMultipart(request.getUploadId(), request.getKey()); - byte[] bytes = streamToBytes(request.getInputStream(), (int)request.getPartSize()); - UploadPartResult result = new UploadPartResult(); - result.setPartNumber(request.getPartNumber()); - result.setETag(multi.addPart(request.getPartNumber(), bytes)); - return result; - } - - CompleteMultipartUploadResult completeMultipart(CompleteMultipartUploadRequest request) - throws AmazonS3Exception { - MockMultipart multi = getMultipart(request.getUploadId(), request.getKey()); - inprogressMultipart.remove(request.getUploadId()); - objects.put(request.getKey(), multi.complete(request.getPartETags())); - CompleteMultipartUploadResult result = new CompleteMultipartUploadResult(); - result.setBucketName(request.getBucketName()); - result.setKey(request.getKey()); - return result; - } - } - - private static byte[] streamToBytes(InputStream data, int length) throws AmazonS3Exception { - byte[] bytes = new byte[length]; - try { - for (int i = 0; i < length; i++) { - bytes[i] = (byte)data.read(); - } - } catch (IOException ioe) { - throw new AmazonS3Exception("Error loading data", ioe); - } - return bytes; - } - - static class MockObject { - final ObjectMetadata metadata; - final byte[] data; - final Map<Integer, long[]> partRanges; - - - MockObject(ObjectMetadata metadata, byte[] data) { - this(metadata, data, null); - } - - MockObject(ObjectMetadata metadata, byte[] data, Map<Integer, long[]> partRanges) { - this.metadata = metadata; - this.data = data; - this.partRanges = partRanges; - } - - } - - static class MockMultipart { - final String key; - final ObjectMetadata metadata; - final ConcurrentSkipListMap<PartETag, byte[]> parts = new ConcurrentSkipListMap<>( - (etag1, etag2) -> ComparisonChain.start().compare(etag1.getPartNumber(), - etag2.getPartNumber()).result()); - - MockMultipart(String key, ObjectMetadata metadata) { - this.key = key; - this.metadata = metadata; - } - - String addPart(int partNumber, byte[] bytes) { - String etag = UUID.randomUUID().toString(); - parts.put(new PartETag(partNumber, etag), bytes); - return etag; - } - - MockObject complete(List<PartETag> tags) throws AmazonS3Exception { - if (parts.size() != tags.size() - || !parts.keySet().containsAll(tags)) { - throw new AmazonS3Exception("Tags don't match uploaded parts"); - } - - int totalSize = parts.values().stream().map(v -> v.length).reduce(0, (acc, v) -> acc + v); - byte[] full = new byte[totalSize]; - - Map<Integer, long[]> partRanges = new HashMap<>(); - int start = 0; - for (Map.Entry<PartETag, byte[]> e : parts.entrySet()) { - int partLength = e.getValue().length; - System.arraycopy(e.getValue(), 0, full, start, partLength); - partRanges.put(e.getKey().getPartNumber(), - new long[] { start, start + partLength - 1 }); - start += partLength; - } - metadata.setContentLength(totalSize); - return new MockObject(metadata, full, partRanges); - } - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java similarity index 71% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java index 7b9d9a2..19463d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -18,24 +18,23 @@ */ package org.apache.pulsar.broker.offload.impl; -import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey; -import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import java.lang.reflect.Method; import java.io.IOException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -51,21 +50,25 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.offload.S3TestBase; -import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; +import org.apache.pulsar.broker.offload.BlobStoreTestBase; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.options.CopyOptions; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @Slf4j -class S3ManagedLedgerOffloaderTest extends S3TestBase { +class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class); + private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024; private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024; final OrderedScheduler scheduler; final MockBookKeeper bk; - S3ManagedLedgerOffloaderTest() throws Exception { + BlobStoreManagedLedgerOffloaderTest() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper()); } @@ -114,31 +117,32 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testHappyCase() throws Exception { - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); } @Test public void testBucketDoesNotExist() throws Exception { - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, "no-bucket", scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, "no-bucket", scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); Assert.fail("Shouldn't be able to add to bucket"); } catch (ExecutionException e) { - Assert.assertTrue(e.getMessage().contains("NoSuchBucket")); + log.error("Exception: ", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("not found")); } } @Test public void testNoRegionConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -148,11 +152,11 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testNoBucketConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -162,13 +166,13 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testSmallBlockSizeConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(1024); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -178,7 +182,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testOffloadAndRead() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -213,21 +217,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { // mock throw exception when initiateMultipartUpload try { - AmazonS3 mockS3client = Mockito.spy(s3client); + + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).initiateMultipartUpload(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).initiateMultipartUpload(any(), any(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception when initiateMultipartUpload"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -239,22 +244,21 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { // mock throw exception when uploadPart try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException("fail DataBlockPartUpload")) - .when(mockS3client).uploadPart(any()); - Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).uploadMultipartPart(any(), anyInt(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception for when uploadPart"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -266,22 +270,25 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { // mock throw exception when completeMultipartUpload try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); + Mockito + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).completeMultipartUpload(any(), any()); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).completeMultipartUpload(any()); - Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); + .doNothing() + .when(spiedBlobStore).abortMultipartUpload(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.fail("Should throw exception for when completeMultipartUpload"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -293,21 +300,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { // mock throw exception when putObject try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).putObject(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).putBlob(any(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.fail("Should throw exception for when putObject for index block"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -328,7 +336,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { randomAccesses[i][1] = second; } - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -360,7 +368,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testOffloadReadInvalidEntryIds() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -385,46 +393,47 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { public void testDeleteOffloaded() throws Exception { ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); // verify object exist after offload offloader.offload(readHandle, uuid, new HashMap<>()).get(); - Assert.assertTrue(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); // verify object deleted after delete offloader.deleteOffloaded(readHandle.getId(), uuid).get(); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } @Test public void testDeleteOffloadedFail() throws Exception { + String failureString = "fail deleteOffloaded"; ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); - String failureString = "fail deleteOffloaded"; - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); + Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).deleteObjects(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).removeBlobs(any(), any()); + + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { // verify object exist after offload offloader.offload(readHandle, uuid, new HashMap<>()).get(); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); offloader.deleteOffloaded(readHandle.getId(), uuid).get(); } catch (Exception e) { // expected - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); // verify object still there. - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -441,7 +450,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { Mockito.doReturn(1234L).when(readHandle).getId(); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { offloader.offload(readHandle, uuid, new HashMap<>()).get(); @@ -454,48 +463,51 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { @Test public void testReadUnknownDataVersion() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid); - ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey); - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + + Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, dataKey).getUserMetadata(); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { + log.error("Exception: ", e); Assert.assertEquals(e.getCause().getClass(), IOException.class); - Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + Assert.assertTrue(e.getCause().getMessage().contains("Error reading from BlobStore")); } - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class); - Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + Assert.assertTrue(e.getCause().getMessage().contains("Error reading from BlobStore")); } } @Test public void testReadUnknownIndexVersion() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid); - ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey); - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + + Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, indexKey).getUserMetadata(); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { offloader.readOffloaded(toWrite.getId(), uuid).get(); @@ -505,8 +517,8 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); } - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { offloader.readOffloaded(toWrite.getId(), uuid).get(); diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml index 6b8eaa4..ec46571 100644 --- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml +++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml @@ -148,7 +148,9 @@ pulsar-proxy*: networkMode: pulsarnet* s3*: - image: adobe/s3mock + ## use latest adobe/s3mock, for issue: https://github.com/adobe/S3Mock/issues/32 + ## TODO: https://github.com/apache/incubator-pulsar/issues/2133 + image: apachepulsar/s3mock await: strategy: org.apache.pulsar.tests.NoopAwaitStrategy env: