sijie closed pull request #2065: GCS offload support(2): replace `s3client` api with `jclouds` related api URL: https://github.com/apache/incubator-pulsar/pull/2065
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/broker.conf b/conf/broker.conf index 2e40d82a87..96433aa517 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -486,7 +486,7 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe ### --- Ledger Offloading --- ### -# Driver to use to offload old data to long term storage (Possible values: S3) +# Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage) managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 33b97481da..85a97e5441 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -334,10 +334,10 @@ The Apache Software License, Version 2.0 - io.swagger-swagger-annotations-1.5.3.jar - io.swagger-swagger-core-1.5.3.jar - io.swagger-swagger-models-1.5.3.jar - * DataSketches + * DataSketches - com.yahoo.datasketches-memory-0.8.3.jar - com.yahoo.datasketches-sketches-core-0.8.3.jar - * Apache Commons + * Apache Commons - commons-beanutils-commons-beanutils-1.7.0.jar - commons-beanutils-commons-beanutils-core-1.8.0.jar - commons-cli-commons-cli-1.2.jar @@ -461,6 +461,12 @@ The Apache Software License, Version 2.0 - org.xerial.snappy-snappy-java-1.1.1.3.jar * Flatbuffers Java - com.google.flatbuffers-flatbuffers-java-1.9.0.jar + * Apache Jclouds + - org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar + * Google Guice Core Library + - com.google.inject.guice-3.0.jar + - com.google.inject.extensions:guice-multibindings-3.0.jar + - com.google.inject.extensions:guice-assistedinject-3.0.jar BSD 3-clause "New" or "Revised" License diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml new file mode 100644 index 0000000000..8aa1786c90 --- /dev/null +++ b/jclouds-shaded/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>jclouds-shaded</artifactId> + <name>Apache Pulsar :: Jclouds shaded</name> + + <dependencies> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.jclouds</groupId> + <artifactId>jclouds-allblobstore</artifactId> + <version>2.2.0-SNAPSHOT</version> + </dependency> + </dependencies> + + <repositories> + <repository> + <id>jclouds-snapshots</id> + <url>https://repository.apache.org/content/repositories/snapshots</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <minimizeJar>false</minimizeJar> + + <artifactSet> + <includes> + <include>com.google.code.gson:gson</include> + <include>com.google.guava:guava</include> + <include>org.apache.jclouds:*</include> + <include>org.apache.jclouds.api:*</include> + <include>org.apache.jclouds.common:*</include> + <include>org.apache.jclouds.provider:*</include> + <include>com.google.inject.extensions:guice-assistedinject</include> + <include>com.google.inject:guice</include> + <include>com.google.inject.extensions:guice-multibindings</include> + </includes> + </artifactSet> + + <relocations> + <relocation> + <pattern>com.google</pattern> + <shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/pom.xml b/pom.xml index a6d645379b..88c2591ad9 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,8 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-zookeeper</module> <module>pulsar-log4j2-appender</module> <module>protobuf-shaded</module> + <!-- jclouds shaded for gson conflict: https://issues.apache.org/jira/browse/JCLOUDS-1166 --> + <module>jclouds-shaded</module> <!-- functions-related modules --> <module>pulsar-functions</module> diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index d505c7a881..ac264bdf5b 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -127,7 +127,7 @@ <dependency> <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> + <artifactId>aws-java-sdk-core</artifactId> </dependency> <!-- functions related dependencies (begin) --> @@ -273,6 +273,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>jclouds-shaded</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2a341afba7..c0aa9d2cfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -61,7 +61,7 @@ import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -657,8 +657,8 @@ public LedgerOffloader getManagedLedgerOffloader() { public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) throws PulsarServerException { if (conf.getManagedLedgerOffloadDriver() != null - && conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME)) { - return S3ManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); + && BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver())) { + return BlobStoreManagedLedgerOffloader.create(conf, getOffloaderScheduler(conf)); } else { return NullLedgerOffloader.INSTANCE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java similarity index 68% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java index e55e61bf54..19fac5902a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java @@ -18,27 +18,22 @@ */ package org.apache.pulsar.broker.offload.impl; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; - import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; - -import java.io.InputStream; import java.io.IOException; - +import java.io.InputStream; import org.apache.pulsar.broker.offload.BackedInputStream; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; - +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.GetOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BackedInputStreamImpl extends BackedInputStream { - private static final Logger log = LoggerFactory.getLogger(S3BackedInputStreamImpl.class); +public class BlobStoreBackedInputStreamImpl extends BackedInputStream { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class); - private final AmazonS3 s3client; + private final BlobStore blobStore; private final String bucket; private final String key; private final VersionCheck versionCheck; @@ -50,10 +45,10 @@ private long bufferOffsetStart; private long bufferOffsetEnd; - public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key, - VersionCheck versionCheck, - long objectLen, int bufferSize) { - this.s3client = s3client; + public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key, + VersionCheck versionCheck, + long objectLen, int bufferSize) { + this.blobStore = blobStore; this.bucket = bucket; this.key = key; this.versionCheck = versionCheck; @@ -76,26 +71,24 @@ private boolean refillBufferIfNeeded() throws IOException { long startRange = cursor; long endRange = Math.min(cursor + bufferSize - 1, objectLen - 1); - GetObjectRequest req = new GetObjectRequest(bucket, key) - .withRange(startRange, endRange); - log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key); - try (S3Object obj = s3client.getObject(req)) { - versionCheck.check(key, obj.getObjectMetadata()); - - Long[] range = obj.getObjectMetadata().getContentRange(); - long bytesRead = range[1] - range[0] + 1; - buffer.clear(); - bufferOffsetStart = range[0]; - bufferOffsetEnd = range[1]; - InputStream s = obj.getObjectContent(); - int bytesToCopy = (int)bytesRead; - while (bytesToCopy > 0) { - bytesToCopy -= buffer.writeBytes(s, bytesToCopy); + try { + Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange)); + versionCheck.check(key, blob); + + try (InputStream stream = blob.getPayload().openStream()) { + buffer.clear(); + bufferOffsetStart = startRange; + bufferOffsetEnd = endRange; + long bytesRead = endRange - startRange + 1; + int bytesToCopy = (int) bytesRead; + while (bytesToCopy > 0) { + bytesToCopy -= buffer.writeBytes(stream, bytesToCopy); + } + cursor += buffer.readableBytes(); } - cursor += buffer.readableBytes(); - } catch (AmazonClientException e) { - throw new IOException("Error reading from S3", e); + } catch (Throwable e) { + throw new IOException("Error reading from BlobStore", e); } } return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java similarity index 82% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java index 08b5ea6c37..36b382be96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java @@ -19,13 +19,8 @@ package org.apache.pulsar.broker.offload.impl; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; - import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; - import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; @@ -33,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -42,18 +36,18 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; - +import org.apache.pulsar.broker.offload.BackedInputStream; import org.apache.pulsar.broker.offload.OffloadIndexBlock; import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; import org.apache.pulsar.broker.offload.OffloadIndexEntry; -import org.apache.pulsar.broker.offload.BackedInputStream; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck; - +import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BackedReadHandleImpl implements ReadHandle { - private static final Logger log = LoggerFactory.getLogger(S3BackedReadHandleImpl.class); +public class BlobStoreBackedReadHandleImpl implements ReadHandle { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); private final long ledgerId; private final OffloadIndexBlock index; @@ -61,9 +55,9 @@ private final DataInputStream dataStream; private final ExecutorService executor; - private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, - BackedInputStream inputStream, - ExecutorService executor) { + private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, + BackedInputStream inputStream, + ExecutorService executor) { this.ledgerId = ledgerId; this.index = index; this.inputStream = inputStream; @@ -189,22 +183,19 @@ public boolean isClosed() { } public static ReadHandle open(ScheduledExecutorService executor, - AmazonS3 s3client, String bucket, String key, String indexKey, + BlobStore blobStore, String bucket, String key, String indexKey, VersionCheck versionCheck, long ledgerId, int readBufferSize) throws AmazonClientException, IOException { - GetObjectRequest req = new GetObjectRequest(bucket, indexKey); - try (S3Object obj = s3client.getObject(req)) { - versionCheck.check(indexKey, obj.getObjectMetadata()); - - OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); - OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); - - BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, - versionCheck, - index.getDataObjectLength(), - readBufferSize); - return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); - } + Blob blob = blobStore.getBlob(bucket, indexKey); + versionCheck.check(indexKey, blob); + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); + OffloadIndexBlock index = indexBuilder.fromStream(blob.getPayload().openStream()); + + BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, + versionCheck, + index.getDataObjectLength(), + readBufferSize); + return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java similarity index 55% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java index ec74d2762c..528ba28348 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java @@ -18,24 +18,17 @@ */ package org.apache.pulsar.broker.offload.impl; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import java.io.IOException; -import java.util.LinkedList; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.ReadHandle; @@ -47,21 +40,48 @@ import org.apache.pulsar.broker.offload.OffloadIndexBlock; import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder; import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; +import org.jclouds.Constants; +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.domain.BlobBuilder; +import org.jclouds.blobstore.domain.MultipartPart; +import org.jclouds.blobstore.domain.MultipartUpload; +import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.domain.Location; +import org.jclouds.domain.LocationBuilder; +import org.jclouds.domain.LocationScope; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.jclouds.s3.reference.S3Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3ManagedLedgerOffloader implements LedgerOffloader { - private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class); +public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { + private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class); - public static final String DRIVER_NAME = "S3"; + public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion"; static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion"; static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha"; static final String CURRENT_VERSION = String.valueOf(1); - private final VersionCheck VERSION_CHECK = (key, metadata) -> { - String version = metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY); + public static boolean driverSupported(String driver) { + return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver)); + } + + private static void addVersionInfo(BlobBuilder blobBuilder) { + blobBuilder.userMetadata(ImmutableMap.of( + METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION, + METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString(), + METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha())); + } + + private final VersionCheck VERSION_CHECK = (key, blob) -> { + // NOTE all metadata in jclouds comes out as lowercase, in an effort to normalize the providers + String version = blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase()); if (version == null || !version.equals(CURRENT_VERSION)) { throw new IOException(String.format("Invalid object version %s for %s, expect %s", version, key, CURRENT_VERSION)); @@ -69,15 +89,21 @@ }; private final OrderedScheduler scheduler; - private final AmazonS3 s3client; + + // container in jclouds private final String bucket; // max block size for each data block. private int maxBlockSize; private final int readBufferSize; - public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, - OrderedScheduler scheduler) + private BlobStoreContext context; + private BlobStore blobStore; + Location location = null; + + public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf, + OrderedScheduler scheduler) throws PulsarServerException { + String driver = conf.getManagedLedgerOffloadDriver(); String region = conf.getS3ManagedLedgerOffloadRegion(); String bucket = conf.getS3ManagedLedgerOffloadBucket(); String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); @@ -96,23 +122,66 @@ public static S3ManagedLedgerOffloader create(ServiceConfiguration conf, throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB"); } - AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region); + } + + // build context for jclouds BlobStoreContext + BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler, + int maxBlockSize, int readBufferSize, String endpoint, String region) { + this.scheduler = scheduler; + this.readBufferSize = readBufferSize; + + this.bucket = container; + this.maxBlockSize = maxBlockSize; + + Properties overrides = new Properties(); + // This property controls the number of parts being uploaded in parallel. + overrides.setProperty("jclouds.mpu.parallel.degree", "1"); + overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); + overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); + overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); + + ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); + + AWSCredentials credentials = null; + try { + DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); + credentials = creds.getCredentials(); + } catch (Exception e) { + log.error("Exception when get credentials for s3 ", e); + } + + String id = "accesskey"; + String key = "secretkey"; + if (credentials != null) { + id = credentials.getAWSAccessKeyId(); + key = credentials.getAWSSecretKey(); + } + contextBuilder.credentials(id, key); + if (!Strings.isNullOrEmpty(endpoint)) { - builder.setEndpointConfiguration(new EndpointConfiguration(endpoint, region)); - builder.setPathStyleAccessEnabled(true); - } else { - builder.setRegion(region); + contextBuilder.endpoint(endpoint); + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + } + if (!Strings.isNullOrEmpty(region)) { + this.location = new LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build(); } - return new S3ManagedLedgerOffloader(builder.build(), bucket, scheduler, maxBlockSize, readBufferSize); + + log.info("Constructor driver: {}, host: {}, container: {}, region: {} ", driver, endpoint, bucket, region); + + contextBuilder.overrides(overrides); + this.context = contextBuilder.buildView(BlobStoreContext.class); + this.blobStore = context.getBlobStore(); } - S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, OrderedScheduler scheduler, - int maxBlockSize, int readBufferSize) { - this.s3client = s3client; - this.bucket = bucket; + // build context for jclouds BlobStoreContext + BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler, + int maxBlockSize, int readBufferSize) { this.scheduler = scheduler; - this.maxBlockSize = maxBlockSize; this.readBufferSize = readBufferSize; + this.bucket = container; + this.maxBlockSize = maxBlockSize; + this.blobStore = blobStore; } static String dataBlockOffloadKey(long ledgerId, UUID uuid) { @@ -141,15 +210,15 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); - ObjectMetadata dataMetadata = new ObjectMetadata(); - addVersionInfo(dataMetadata); - - InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata); - InitiateMultipartUploadResult dataBlockRes = null; + MultipartUpload mpu = null; + List<MultipartPart> parts = Lists.newArrayList(); // init multi part upload for data block. try { - dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq); + BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey); + addVersionInfo(blobBuilder); + Blob blob = blobBuilder.build(); + mpu = blobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); } catch (Throwable t) { promise.completeExceptionally(t); return; @@ -161,7 +230,6 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { long startEntry = 0; int partId = 1; long entryBytesWritten = 0; - List<PartETag> etags = new LinkedList<>(); while (startEntry <= readHandle.getLastAddConfirmed()) { int blockSize = BlockAwareSegmentInputStreamImpl .calculateBlockSize(maxBlockSize, readHandle, startEntry, entryBytesWritten); @@ -169,15 +237,13 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( readHandle, startEntry, blockSize)) { - UploadPartResult uploadRes = s3client.uploadPart( - new UploadPartRequest() - .withBucketName(bucket) - .withKey(dataBlockKey) - .withUploadId(dataBlockRes.getUploadId()) - .withInputStream(blockStream) - .withPartSize(blockSize) - .withPartNumber(partId)); - etags.add(uploadRes.getPartETag()); + Payload partPayload = Payloads.newInputStreamPayload(blockStream); + partPayload.getContentMetadata().setContentLength((long)blockSize); + partPayload.getContentMetadata().setContentType("application/octet-stream"); + parts.add(blobStore.uploadMultipartPart(mpu, partId, partPayload)); + log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", + bucket, dataBlockKey, partId, mpu.id()); + indexBuilder.addBlock(startEntry, partId, blockSize); if (blockStream.getEndEntryId() != -1) { @@ -193,17 +259,16 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { dataObjectLength += blockSize; } - s3client.completeMultipartUpload(new CompleteMultipartUploadRequest() - .withBucketName(bucket).withKey(dataBlockKey) - .withUploadId(dataBlockRes.getUploadId()) - .withPartETags(etags)); + blobStore.completeMultipartUpload(mpu, parts); + mpu = null; } catch (Throwable t) { try { - s3client.abortMultipartUpload( - new AbortMultipartUploadRequest(bucket, dataBlockKey, dataBlockRes.getUploadId())); + if (mpu != null) { + blobStore.abortMultipartUpload(mpu); + } } catch (Throwable throwable) { log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, dataBlockRes.getUploadId(), throwable); + bucket, dataBlockKey, mpu.id(), throwable); } promise.completeExceptionally(t); return; @@ -213,19 +278,22 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) { // write the index block - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(indexStream.getStreamSize()); - addVersionInfo(metadata); - - s3client.putObject(new PutObjectRequest( - bucket, - indexBlockKey, - indexStream, - metadata)); + BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey); + addVersionInfo(blobBuilder); + Payload indexPayload = Payloads.newInputStreamPayload(indexStream); + indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize()); + indexPayload.getContentMetadata().setContentType("application/octet-stream"); + + Blob blob = blobBuilder + .payload(indexPayload) + .contentLength((long)indexStream.getStreamSize()) + .build(); + + blobStore.putBlob(bucket, blob); promise.complete(null); } catch (Throwable t) { try { - s3client.deleteObject(bucket, dataBlockKey); + blobStore.removeBlob(bucket, dataBlockKey); } catch (Throwable throwable) { log.error("Failed deleteObject in bucket - {} with key - {}.", bucket, dataBlockKey, throwable); @@ -244,35 +312,31 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { String indexKey = indexBlockOffloadKey(ledgerId, uid); scheduler.chooseThread(ledgerId).submit(() -> { try { - promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), - s3client, + promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), + blobStore, bucket, key, indexKey, VERSION_CHECK, ledgerId, readBufferSize)); } catch (Throwable t) { + log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); } }); return promise; } - private static void addVersionInfo(ObjectMetadata metadata) { - metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION); - metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY, - PulsarBrokerVersionStringUtils.getNormalizedVersionString()); - metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha()); - } + @Override public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.chooseThread(ledgerId).submit(() -> { try { - s3client.deleteObjects(new DeleteObjectsRequest(bucket) - .withKeys(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); + blobStore.removeBlobs(bucket, + ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { - log.error("Failed delete s3 Object ", t); + log.error("Failed delete Blob", t); promise.completeExceptionally(t); } }); @@ -281,7 +345,7 @@ private static void addVersionInfo(ObjectMetadata metadata) { } public interface VersionCheck { - void check(String key, ObjectMetadata md) throws IOException; + void check(String key, Blob blob) throws IOException; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java similarity index 61% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java index 45bca52d59..bde4ddee9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java @@ -18,29 +18,35 @@ */ package org.apache.pulsar.broker.offload; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.spy; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ObjectMetadata; - -import java.io.InputStream; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Random; - import lombok.extern.slf4j.Slf4j; - -import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl; - +import org.apache.pulsar.broker.offload.impl.BlobStoreBackedInputStreamImpl; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.options.GetOptions; +import org.jclouds.io.Payload; +import org.jclouds.io.Payloads; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @Slf4j -class S3BackedInputStreamTest extends S3TestBase { +class BlobStoreBackedInputStreamTest extends BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class); + class RandomInputStream extends InputStream { final Random r; int bytesRemaining; @@ -85,16 +91,21 @@ private void assertStreamsMatchByBytes(InputStream a, InputStream b) throws Exce @Test public void testReadingFullObject() throws Exception { - String objectKey = "foobar"; + String objectKey = "testReadingFull"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); RandomInputStream toCompare = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatch(toTest, toCompare); @@ -102,24 +113,29 @@ public void testReadingFullObject() throws Exception { @Test public void testReadingFullObjectByBytes() throws Exception { - String objectKey = "foobar"; + String objectKey = "testReadingFull2"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); RandomInputStream toCompare = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); assertStreamsMatchByBytes(toTest, toCompare); } @Test(expectedExceptions = IOException.class) - public void testErrorOnS3Read() throws Exception { - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", + public void testErrorOnRead() throws Exception { + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist", (key, md) -> {}, 1234, 1000); toTest.read(); @@ -128,7 +144,7 @@ public void testErrorOnS3Read() throws Exception { @Test public void testSeek() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeek"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); @@ -141,11 +157,16 @@ public void testSeek() throws Exception { seeks.put(seek, stream); } - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) { @@ -156,16 +177,23 @@ public void testSeek() throws Exception { @Test public void testSeekWithinCurrent() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeekWithinCurrent"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); + + //BlobStore spiedBlobStore = spy(blobStore); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); - AmazonS3 spiedClient = spy(s3client); - BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); @@ -193,20 +221,26 @@ public void testSeekWithinCurrent() throws Exception { Assert.assertEquals(thirdSeek.read(), toTest.read()); } - verify(spiedClient, times(1)).getObject(anyObject()); + verify(spiedBlobStore, times(1)) + .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), Matchers.<GetOptions>anyObject()); } @Test public void testSeekForward() throws Exception { - String objectKey = "foobar"; + String objectKey = "testSeekForward"; int objectSize = 12345; RandomInputStream toWrite = new RandomInputStream(0, objectSize); - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(objectSize); - s3client.putObject(BUCKET, objectKey, toWrite, metadata); + Payload payload = Payloads.newInputStreamPayload(toWrite); + payload.getContentMetadata().setContentLength((long)objectSize); + Blob blob = blobStore.blobBuilder(objectKey) + .payload(payload) + .contentLength((long)objectSize) + .build(); + String ret = blobStore.putBlob(BUCKET, blob); + log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", objectKey, BUCKET, ret); - BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + BackedInputStream toTest = new BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey, (key, md) -> {}, objectSize, 1000); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java similarity index 53% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java index f2ea6c4b33..d1474f95fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java @@ -18,27 +18,39 @@ */ package org.apache.pulsar.broker.offload; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; - +import org.jclouds.ContextBuilder; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.BlobStoreContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -public class S3TestBase { +public class BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreTestBase.class); + public final static String BUCKET = "pulsar-unittest"; - protected AmazonS3 s3client = null; + protected BlobStoreContext context = null; + protected BlobStore blobStore = null; @BeforeMethod public void start() throws Exception { - if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) { - // To use this, ~/.aws must be configured with credentials and a default region - s3client = AmazonS3ClientBuilder.standard().build(); - } else { - s3client = new S3Mock(); + context = ContextBuilder.newBuilder("transient").build(BlobStoreContext.class); + blobStore = context.getBlobStore(); + boolean create = blobStore.createContainerInLocation(null, BUCKET); + + log.debug("TestBase Create Bucket: {}, in blobStore, result: {}", BUCKET, create); + } + + @AfterMethod + public void tearDown() { + if (blobStore != null) { + blobStore.deleteContainer(BUCKET); } - if (!s3client.doesBucketExistV2(BUCKET)) { - s3client.createBucket(BUCKET); + if (context != null) { + context.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java deleted file mode 100644 index 4bfc1401a8..0000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.offload; - -import com.amazonaws.services.s3.AbstractAmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.Bucket; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.CopyObjectResult; -import com.amazonaws.services.s3.model.DeleteObjectRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; -import com.amazonaws.services.s3.model.GetObjectMetadataRequest; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; - -import com.google.common.collect.ComparisonChain; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.stream.Collectors; - -/** - * Minimal mock for amazon client. - * If making any changes, validate they behave the same as S3 by running all S3 tests with -DtestRealAWS=true - */ -class S3Mock extends AbstractAmazonS3 { - @Override - public boolean doesBucketExistV2(String bucketName) { - return buckets.containsKey(bucketName); - } - - @Override - public boolean doesObjectExist(String bucketName, String objectName) { - return buckets.containsKey(bucketName) && getBucket(bucketName).hasObject(objectName); - } - - @Override - public Bucket createBucket(String bucketName) { - return buckets.computeIfAbsent(bucketName, (k) -> new MockBucket(k)); - } - - private MockBucket getBucket(String bucketName) throws AmazonS3Exception { - MockBucket bucket = buckets.get(bucketName); - if (bucket != null) { - return bucket; - } else { - throw new AmazonS3Exception("NoSuchBucket: Bucket doesn't exist"); - } - } - - @Override - public PutObjectResult putObject(PutObjectRequest putObjectRequest) - throws AmazonS3Exception { - return getBucket(putObjectRequest.getBucketName()).putObject(putObjectRequest); - } - - @Override - public S3Object getObject(GetObjectRequest getObjectRequest) { - return getBucket(getObjectRequest.getBucketName()).getObject(getObjectRequest); - } - - @Override - public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) - throws AmazonS3Exception { - return getBucket(getObjectMetadataRequest.getBucketName()).getObjectMetadata(getObjectMetadataRequest); - } - - @Override - public void deleteObject(DeleteObjectRequest deleteObjectRequest) - throws AmazonS3Exception { - getBucket(deleteObjectRequest.getBucketName()).deleteObject(deleteObjectRequest.getKey()); - } - - @Override - public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) - throws AmazonS3Exception { - List<DeleteObjectsResult.DeletedObject> results = deleteObjectsRequest.getKeys().stream().map((k) -> { - getBucket(deleteObjectsRequest.getBucketName()).deleteObject(k.getKey()); - DeleteObjectsResult.DeletedObject res = new DeleteObjectsResult.DeletedObject(); - res.setKey(k.getKey()); - return res; - }).collect(Collectors.toList()); - return new DeleteObjectsResult(results); - } - - @Override - public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) - throws AmazonS3Exception { - S3Object from = getObject(new GetObjectRequest(copyObjectRequest.getSourceBucketName(), - copyObjectRequest.getSourceKey())); - ObjectMetadata newMetadata = copyObjectRequest.getNewObjectMetadata(); - if (newMetadata == null) { - newMetadata = from.getObjectMetadata(); - } - newMetadata.setContentLength(from.getObjectMetadata().getContentLength()); - putObject(new PutObjectRequest(copyObjectRequest.getDestinationBucketName(), - copyObjectRequest.getDestinationKey(), - from.getObjectContent(), - newMetadata)); - return new CopyObjectResult(); - } - - @Override - public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).initMultipart(request); - } - - @Override - public UploadPartResult uploadPart(UploadPartRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).uploadPart(request); - } - - @Override - public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) - throws AmazonS3Exception { - return getBucket(request.getBucketName()).completeMultipart(request); - } - - ConcurrentHashMap<String, MockBucket> buckets = new ConcurrentHashMap<>(); - - static class MockBucket extends Bucket { - ConcurrentHashMap<String, MockObject> objects = new ConcurrentHashMap<>(); - ConcurrentHashMap<String, MockMultipart> inprogressMultipart = new ConcurrentHashMap<>(); - - MockBucket(String name) { - super(name); - } - - boolean hasObject(String key) { - return objects.containsKey(key); - } - - PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonS3Exception { - byte[] bytes = streamToBytes(putObjectRequest.getInputStream(), - (int)putObjectRequest.getMetadata().getContentLength()); - objects.put(putObjectRequest.getKey(), - new MockObject(putObjectRequest.getMetadata(), bytes)); - return new PutObjectResult(); - } - - S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonS3Exception { - MockObject obj = objects.get(getObjectRequest.getKey()); - if (obj == null) { - throw new AmazonS3Exception("Object doesn't exist"); - } - - S3Object s3obj = new S3Object(); - s3obj.setBucketName(getObjectRequest.getBucketName()); - s3obj.setKey(getObjectRequest.getKey()); - - if (getObjectRequest.getRange() != null) { - long[] range = getObjectRequest.getRange(); - int size = (int)(range[1] - range[0] + 1); - ObjectMetadata metadata = obj.metadata.clone(); - metadata.setHeader("Content-Range", - String.format("bytes %d-%d/%d", - range[0], range[1], size)); - s3obj.setObjectMetadata(metadata); - s3obj.setObjectContent(new ByteArrayInputStream(obj.data, (int)range[0], size)); - return s3obj; - } else { - s3obj.setObjectMetadata(obj.metadata); - s3obj.setObjectContent(new ByteArrayInputStream(obj.data)); - return s3obj; - } - } - - void deleteObject(String key) { - objects.remove(key); - } - - ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) - throws AmazonS3Exception { - MockObject obj = objects.get(getObjectMetadataRequest.getKey()); - if (obj == null) { - throw new AmazonS3Exception("Object doesn't exist"); - } - return obj.metadata; - } - - InitiateMultipartUploadResult initMultipart(InitiateMultipartUploadRequest request) - throws AmazonS3Exception { - String uploadId = UUID.randomUUID().toString(); - inprogressMultipart.put(uploadId, new MockMultipart(request.getKey(), - request.getObjectMetadata())); - InitiateMultipartUploadResult result = new InitiateMultipartUploadResult(); - result.setBucketName(request.getBucketName()); - result.setKey(request.getKey()); - result.setUploadId(uploadId); - return result; - } - - MockMultipart getMultipart(String uploadId, String key) throws AmazonS3Exception { - MockMultipart multi = inprogressMultipart.get(uploadId); - if (multi == null) { - throw new AmazonS3Exception("No such upload " + uploadId); - } - if (!multi.key.equals(key)) { - throw new AmazonS3Exception("Wrong key for upload " + uploadId - + ", expected " + key - + ", got " + multi.key); - } - return multi; - } - - UploadPartResult uploadPart(UploadPartRequest request) - throws AmazonS3Exception { - MockMultipart multi = getMultipart(request.getUploadId(), request.getKey()); - byte[] bytes = streamToBytes(request.getInputStream(), (int)request.getPartSize()); - UploadPartResult result = new UploadPartResult(); - result.setPartNumber(request.getPartNumber()); - result.setETag(multi.addPart(request.getPartNumber(), bytes)); - return result; - } - - CompleteMultipartUploadResult completeMultipart(CompleteMultipartUploadRequest request) - throws AmazonS3Exception { - MockMultipart multi = getMultipart(request.getUploadId(), request.getKey()); - inprogressMultipart.remove(request.getUploadId()); - objects.put(request.getKey(), multi.complete(request.getPartETags())); - CompleteMultipartUploadResult result = new CompleteMultipartUploadResult(); - result.setBucketName(request.getBucketName()); - result.setKey(request.getKey()); - return result; - } - } - - private static byte[] streamToBytes(InputStream data, int length) throws AmazonS3Exception { - byte[] bytes = new byte[length]; - try { - for (int i = 0; i < length; i++) { - bytes[i] = (byte)data.read(); - } - } catch (IOException ioe) { - throw new AmazonS3Exception("Error loading data", ioe); - } - return bytes; - } - - static class MockObject { - final ObjectMetadata metadata; - final byte[] data; - final Map<Integer, long[]> partRanges; - - - MockObject(ObjectMetadata metadata, byte[] data) { - this(metadata, data, null); - } - - MockObject(ObjectMetadata metadata, byte[] data, Map<Integer, long[]> partRanges) { - this.metadata = metadata; - this.data = data; - this.partRanges = partRanges; - } - - } - - static class MockMultipart { - final String key; - final ObjectMetadata metadata; - final ConcurrentSkipListMap<PartETag, byte[]> parts = new ConcurrentSkipListMap<>( - (etag1, etag2) -> ComparisonChain.start().compare(etag1.getPartNumber(), - etag2.getPartNumber()).result()); - - MockMultipart(String key, ObjectMetadata metadata) { - this.key = key; - this.metadata = metadata; - } - - String addPart(int partNumber, byte[] bytes) { - String etag = UUID.randomUUID().toString(); - parts.put(new PartETag(partNumber, etag), bytes); - return etag; - } - - MockObject complete(List<PartETag> tags) throws AmazonS3Exception { - if (parts.size() != tags.size() - || !parts.keySet().containsAll(tags)) { - throw new AmazonS3Exception("Tags don't match uploaded parts"); - } - - int totalSize = parts.values().stream().map(v -> v.length).reduce(0, (acc, v) -> acc + v); - byte[] full = new byte[totalSize]; - - Map<Integer, long[]> partRanges = new HashMap<>(); - int start = 0; - for (Map.Entry<PartETag, byte[]> e : parts.entrySet()) { - int partLength = e.getValue().length; - System.arraycopy(e.getValue(), 0, full, start, partLength); - partRanges.put(e.getKey().getPartNumber(), - new long[] { start, start + partLength - 1 }); - start += partLength; - } - metadata.setContentLength(totalSize); - return new MockObject(metadata, full, partRanges); - } - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java similarity index 71% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java index 7b9d9a2fa9..19463d2fc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -18,24 +18,23 @@ */ package org.apache.pulsar.broker.offload.impl; -import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey; -import static org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey; +import static org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import java.lang.reflect.Method; import java.io.IOException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -51,21 +50,25 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.offload.S3TestBase; -import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl; -import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader; +import org.apache.pulsar.broker.offload.BlobStoreTestBase; +import org.jclouds.blobstore.BlobStore; +import org.jclouds.blobstore.options.CopyOptions; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; @Slf4j -class S3ManagedLedgerOffloaderTest extends S3TestBase { +class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase { + private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class); + private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024; private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024; final OrderedScheduler scheduler; final MockBookKeeper bk; - S3ManagedLedgerOffloaderTest() throws Exception { + BlobStoreManagedLedgerOffloaderTest() throws Exception { scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); bk = new MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper()); } @@ -114,31 +117,32 @@ private ReadHandle buildReadHandle(int maxBlockSize, int blockCount) throws Exce @Test public void testHappyCase() throws Exception { - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); } @Test public void testBucketDoesNotExist() throws Exception { - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, "no-bucket", scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, "no-bucket", scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { offloader.offload(buildReadHandle(), UUID.randomUUID(), new HashMap<>()).get(); Assert.fail("Shouldn't be able to add to bucket"); } catch (ExecutionException e) { - Assert.assertTrue(e.getMessage().contains("NoSuchBucket")); + log.error("Exception: ", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("not found")); } } @Test public void testNoRegionConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -148,11 +152,11 @@ public void testNoRegionConfigured() throws Exception { @Test public void testNoBucketConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -162,13 +166,13 @@ public void testNoBucketConfigured() throws Exception { @Test public void testSmallBlockSizeConfigured() throws Exception { ServiceConfiguration conf = new ServiceConfiguration(); - conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME); + conf.setManagedLedgerOffloadDriver("s3"); conf.setS3ManagedLedgerOffloadRegion("eu-west-1"); conf.setS3ManagedLedgerOffloadBucket(BUCKET); conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(1024); try { - S3ManagedLedgerOffloader.create(conf, scheduler); + BlobStoreManagedLedgerOffloader.create(conf, scheduler); Assert.fail("Should have thrown exception"); } catch (PulsarServerException pse) { // correct @@ -178,7 +182,7 @@ public void testSmallBlockSizeConfigured() throws Exception { @Test public void testOffloadAndRead() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -213,21 +217,22 @@ public void testOffloadFailInitDataBlockUpload() throws Exception { // mock throw exception when initiateMultipartUpload try { - AmazonS3 mockS3client = Mockito.spy(s3client); + + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).initiateMultipartUpload(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).initiateMultipartUpload(any(), any(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception when initiateMultipartUpload"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -239,22 +244,21 @@ public void testOffloadFailDataBlockPartUpload() throws Exception { // mock throw exception when uploadPart try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException("fail DataBlockPartUpload")) - .when(mockS3client).uploadPart(any()); - Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).uploadMultipartPart(any(), anyInt(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); Assert.fail("Should throw exception for when uploadPart"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -266,22 +270,25 @@ public void testOffloadFailDataBlockUploadComplete() throws Exception { // mock throw exception when completeMultipartUpload try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); + Mockito + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).completeMultipartUpload(any(), any()); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).completeMultipartUpload(any()); - Mockito.doNothing().when(mockS3client).abortMultipartUpload(any()); + .doNothing() + .when(spiedBlobStore).abortMultipartUpload(any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.fail("Should throw exception for when completeMultipartUpload"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -293,21 +300,22 @@ public void testOffloadFailPutIndexBlock() throws Exception { // mock throw exception when putObject try { - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).putObject(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).putBlob(any(), any()); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.fail("Should throw exception for when putObject for index block"); } catch (Exception e) { // excepted - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause() instanceof RuntimeException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -328,7 +336,7 @@ public void testOffloadReadRandomAccess() throws Exception { randomAccesses[i][1] = second; } - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -360,7 +368,7 @@ public void testOffloadReadRandomAccess() throws Exception { @Test public void testOffloadReadInvalidEntryIds() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); @@ -385,46 +393,47 @@ public void testOffloadReadInvalidEntryIds() throws Exception { public void testDeleteOffloaded() throws Exception { ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); // verify object exist after offload offloader.offload(readHandle, uuid, new HashMap<>()).get(); - Assert.assertTrue(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); // verify object deleted after delete offloader.deleteOffloaded(readHandle.getId(), uuid).get(); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } @Test public void testDeleteOffloadedFail() throws Exception { + String failureString = "fail deleteOffloaded"; ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, - DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); - String failureString = "fail deleteOffloaded"; - AmazonS3 mockS3client = Mockito.spy(s3client); + BlobStore spiedBlobStore = mock(BlobStore.class, delegatesTo(blobStore)); + Mockito - .doThrow(new AmazonServiceException(failureString)) - .when(mockS3client).deleteObjects(any()); + .doThrow(new RuntimeException(failureString)) + .when(spiedBlobStore).removeBlobs(any(), any()); + + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { // verify object exist after offload offloader.offload(readHandle, uuid, new HashMap<>()).get(); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); offloader.deleteOffloaded(readHandle.getId(), uuid).get(); } catch (Exception e) { // expected - Assert.assertTrue(e.getCause() instanceof AmazonServiceException); Assert.assertTrue(e.getCause().getMessage().contains(failureString)); // verify object still there. - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); - Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(blobStore.blobExists(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } @@ -441,7 +450,7 @@ public void testOffloadEmpty() throws Exception { Mockito.doReturn(1234L).when(readHandle).getId(); UUID uuid = UUID.randomUUID(); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); try { offloader.offload(readHandle, uuid, new HashMap<>()).get(); @@ -454,48 +463,51 @@ public void testOffloadEmpty() throws Exception { @Test public void testReadUnknownDataVersion() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid); - ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey); - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + + Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, dataKey).getUserMetadata(); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { + log.error("Exception: ", e); Assert.assertEquals(e.getCause().getClass(), IOException.class); - Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + Assert.assertTrue(e.getCause().getMessage().contains("Error reading from BlobStore")); } - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class); - Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + Assert.assertTrue(e.getCause().getMessage().contains("Error reading from BlobStore")); } } @Test public void testReadUnknownIndexVersion() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); - LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + LedgerOffloader offloader = new BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler, DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid); - ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey); - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + + Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, indexKey).getUserMetadata(); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { offloader.readOffloaded(toWrite.getId(), uuid).get(); @@ -505,8 +517,8 @@ public void testReadUnknownIndexVersion() throws Exception { Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); } - md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); - s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { offloader.readOffloaded(toWrite.getId(), uuid).get(); diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml index 6b8eaa427d..ec46571249 100644 --- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml +++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml @@ -148,7 +148,9 @@ pulsar-proxy*: networkMode: pulsarnet* s3*: - image: adobe/s3mock + ## use latest adobe/s3mock, for issue: https://github.com/adobe/S3Mock/issues/32 + ## TODO: https://github.com/apache/incubator-pulsar/issues/2133 + image: apachepulsar/s3mock await: strategy: org.apache.pulsar.tests.NoopAwaitStrategy env: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services