This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c8d13c  GCS offload support(2): replace `s3client` api with `jclouds` 
related api  (#2065)
3c8d13c is described below

commit 3c8d13c0546b7fed0100d499bd1841f7ce2a127e
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Fri Jul 20 13:27:05 2018 +0800

    GCS offload support(2): replace `s3client` api with `jclouds` related api  
(#2065)
    
    This is the second part to support `Google Cloud Storage` offload.
    It aims to replace "s3 client" api with "jclouds" api, and make sure unit 
test and integration test passed.
    There will be a following change to add `Google Cloud Storage` support and 
related test.
    
    change:
    replace `s3client` api with `jclouds` related api in 
`S3ManagedLedgerOffloader`
    
    Master Issue: #2067
---
 conf/broker.conf                                   |   2 +-
 distribution/server/src/assemble/LICENSE.bin.txt   |  10 +-
 jclouds-shaded/pom.xml                             | 105 +++++++
 pom.xml                                            |   2 +
 pulsar-broker/pom.xml                              |   8 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   6 +-
 ...pl.java => BlobStoreBackedInputStreamImpl.java} |  63 ++--
 ...mpl.java => BlobStoreBackedReadHandleImpl.java} |  49 ++-
 ...r.java => BlobStoreManagedLedgerOffloader.java} | 220 +++++++++-----
 ...st.java => BlobStoreBackedInputStreamTest.java} | 114 ++++---
 .../{S3TestBase.java => BlobStoreTestBase.java}    |  36 ++-
 .../org/apache/pulsar/broker/offload/S3Mock.java   | 334 ---------------------
 ...va => BlobStoreManagedLedgerOffloaderTest.java} | 190 ++++++------
 ...luster-2-bookie-1-broker-unstarted-with-s3.yaml |   4 +-
 14 files changed, 518 insertions(+), 625 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2e40d82..96433aa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -486,7 +486,7 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 
 ### --- Ledger Offloading --- ###
 
-# Driver to use to offload old data to long term storage (Possible values: S3)
+# Driver to use to offload old data to long term storage (Possible values: S3, 
aws-s3, google-cloud-storage)
 managedLedgerOffloadDriver=
 
 # Maximum number of thread pool threads for ledger offloading
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 33b9748..85a97e5 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -334,10 +334,10 @@ The Apache Software License, Version 2.0
     - io.swagger-swagger-annotations-1.5.3.jar
     - io.swagger-swagger-core-1.5.3.jar
     - io.swagger-swagger-models-1.5.3.jar
- * DataSketches 
+ * DataSketches
     - com.yahoo.datasketches-memory-0.8.3.jar
     - com.yahoo.datasketches-sketches-core-0.8.3.jar
- * Apache Commons 
+ * Apache Commons
     - commons-beanutils-commons-beanutils-1.7.0.jar
     - commons-beanutils-commons-beanutils-core-1.8.0.jar
     - commons-cli-commons-cli-1.2.jar
@@ -461,6 +461,12 @@ The Apache Software License, Version 2.0
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Flatbuffers Java
     - com.google.flatbuffers-flatbuffers-java-1.9.0.jar
+  * Apache Jclouds
+    - org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar
+  * Google Guice Core Library
+    - com.google.inject.guice-3.0.jar
+    - com.google.inject.extensions:guice-multibindings-3.0.jar
+    - com.google.inject.extensions:guice-assistedinject-3.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
new file mode 100644
index 0000000..8aa1786
--- /dev/null
+++ b/jclouds-shaded/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>jclouds-shaded</artifactId>
+  <name>Apache Pulsar :: Jclouds shaded</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jclouds</groupId>
+      <artifactId>jclouds-allblobstore</artifactId>
+      <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>jclouds-snapshots</id>
+      <url>https://repository.apache.org/content/repositories/snapshots</url>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <includes>
+                  <include>com.google.code.gson:gson</include>
+                  <include>com.google.guava:guava</include>
+                  <include>org.apache.jclouds:*</include>
+                  <include>org.apache.jclouds.api:*</include>
+                  <include>org.apache.jclouds.common:*</include>
+                  <include>org.apache.jclouds.provider:*</include>
+                  
<include>com.google.inject.extensions:guice-assistedinject</include>
+                  <include>com.google.inject:guice</include>
+                  
<include>com.google.inject.extensions:guice-multibindings</include>
+                </includes>
+              </artifactSet>
+
+              <relocations>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  
<shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern>
+                </relocation>
+              </relocations>
+              <transformers>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
 />
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pom.xml b/pom.xml
index 263d362..6369a10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>pulsar-zookeeper</module>
     <module>pulsar-log4j2-appender</module>
     <module>protobuf-shaded</module>
+    <!-- jclouds shaded for gson conflict: 
https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
+    <module>jclouds-shaded</module>
 
     <!-- functions-related modules -->
     <module>pulsar-functions</module>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index d505c7a..ac264bd 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -127,7 +127,7 @@
 
     <dependency>
       <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-s3</artifactId>
+      <artifactId>aws-java-sdk-core</artifactId>
     </dependency>
 
     <!-- functions related dependencies (begin) -->
@@ -273,6 +273,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>jclouds-shaded</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a341af..c0aa9d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -61,7 +61,7 @@ import 
org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -657,8 +657,8 @@ public class PulsarService implements AutoCloseable {
     public synchronized LedgerOffloader 
createManagedLedgerOffloader(ServiceConfiguration conf)
             throws PulsarServerException {
         if (conf.getManagedLedgerOffloadDriver() != null
-            && 
conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME))
 {
-            return S3ManagedLedgerOffloader.create(conf, 
getOffloaderScheduler(conf));
+            && 
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
 {
+                return BlobStoreManagedLedgerOffloader.create(conf, 
getOffloaderScheduler(conf));
         } else {
             return NullLedgerOffloader.INSTANCE;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
similarity index 68%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
index e55e61b..19fac59 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
@@ -18,27 +18,22 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-
-import java.io.InputStream;
 import java.io.IOException;
-
+import java.io.InputStream;
 import org.apache.pulsar.broker.offload.BackedInputStream;
-import 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3BackedInputStreamImpl extends BackedInputStream {
-    private static final Logger log = 
LoggerFactory.getLogger(S3BackedInputStreamImpl.class);
+public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);
 
-    private final AmazonS3 s3client;
+    private final BlobStore blobStore;
     private final String bucket;
     private final String key;
     private final VersionCheck versionCheck;
@@ -50,10 +45,10 @@ public class S3BackedInputStreamImpl extends 
BackedInputStream {
     private long bufferOffsetStart;
     private long bufferOffsetEnd;
 
-    public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String 
key,
-                                   VersionCheck versionCheck,
-                                   long objectLen, int bufferSize) {
-        this.s3client = s3client;
+    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, 
String key,
+                                          VersionCheck versionCheck,
+                                          long objectLen, int bufferSize) {
+        this.blobStore = blobStore;
         this.bucket = bucket;
         this.key = key;
         this.versionCheck = versionCheck;
@@ -76,26 +71,24 @@ public class S3BackedInputStreamImpl extends 
BackedInputStream {
             long startRange = cursor;
             long endRange = Math.min(cursor + bufferSize - 1,
                                      objectLen - 1);
-            GetObjectRequest req = new GetObjectRequest(bucket, key)
-                .withRange(startRange, endRange);
-            log.debug("Reading range {}-{} from {}/{}", startRange, endRange, 
bucket, key);
-            try (S3Object obj = s3client.getObject(req)) {
-                versionCheck.check(key, obj.getObjectMetadata());
-
-                Long[] range = obj.getObjectMetadata().getContentRange();
-                long bytesRead = range[1] - range[0] + 1;
 
-                buffer.clear();
-                bufferOffsetStart = range[0];
-                bufferOffsetEnd = range[1];
-                InputStream s = obj.getObjectContent();
-                int bytesToCopy = (int)bytesRead;
-                while (bytesToCopy > 0) {
-                    bytesToCopy -= buffer.writeBytes(s, bytesToCopy);
+            try {
+                Blob blob = blobStore.getBlob(bucket, key, new 
GetOptions().range(startRange, endRange));
+                versionCheck.check(key, blob);
+
+                try (InputStream stream = blob.getPayload().openStream()) {
+                    buffer.clear();
+                    bufferOffsetStart = startRange;
+                    bufferOffsetEnd = endRange;
+                    long bytesRead = endRange - startRange + 1;
+                    int bytesToCopy = (int) bytesRead;
+                    while (bytesToCopy > 0) {
+                        bytesToCopy -= buffer.writeBytes(stream, bytesToCopy);
+                    }
+                    cursor += buffer.readableBytes();
                 }
-                cursor += buffer.readableBytes();
-            } catch (AmazonClientException e) {
-                throw new IOException("Error reading from S3", e);
+            } catch (Throwable e) {
+                throw new IOException("Error reading from BlobStore", e);
             }
         }
         return true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
similarity index 82%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
index 08b5ea6..36b382b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,13 +19,8 @@
 package org.apache.pulsar.broker.offload.impl;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,7 +28,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -42,18 +36,18 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-
+import org.apache.pulsar.broker.offload.BackedInputStream;
 import org.apache.pulsar.broker.offload.OffloadIndexBlock;
 import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.broker.offload.OffloadIndexEntry;
-import org.apache.pulsar.broker.offload.BackedInputStream;
-import 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3BackedReadHandleImpl implements ReadHandle {
-    private static final Logger log = 
LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
+public class BlobStoreBackedReadHandleImpl implements ReadHandle {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
 
     private final long ledgerId;
     private final OffloadIndexBlock index;
@@ -61,9 +55,9 @@ public class S3BackedReadHandleImpl implements ReadHandle {
     private final DataInputStream dataStream;
     private final ExecutorService executor;
 
-    private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
-                                   BackedInputStream inputStream,
-                                   ExecutorService executor) {
+    private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
+                                          BackedInputStream inputStream,
+                                          ExecutorService executor) {
         this.ledgerId = ledgerId;
         this.index = index;
         this.inputStream = inputStream;
@@ -189,22 +183,19 @@ public class S3BackedReadHandleImpl implements ReadHandle 
{
     }
 
     public static ReadHandle open(ScheduledExecutorService executor,
-                                  AmazonS3 s3client, String bucket, String 
key, String indexKey,
+                                  BlobStore blobStore, String bucket, String 
key, String indexKey,
                                   VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize)
             throws AmazonClientException, IOException {
-        GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
-        try (S3Object obj = s3client.getObject(req)) {
-            versionCheck.check(indexKey, obj.getObjectMetadata());
-
-            OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
-            OffloadIndexBlock index = 
indexBuilder.fromStream(obj.getObjectContent());
-
-            BackedInputStream inputStream = new 
S3BackedInputStreamImpl(s3client, bucket, key,
-                                                                          
versionCheck,
-                                                                          
index.getDataObjectLength(),
-                                                                          
readBufferSize);
-            return new S3BackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
-        }
+        Blob blob = blobStore.getBlob(bucket, indexKey);
+        versionCheck.check(indexKey, blob);
+        OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
+        OffloadIndexBlock index = 
indexBuilder.fromStream(blob.getPayload().openStream());
+
+        BackedInputStream inputStream = new 
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
+            versionCheck,
+            index.getDataObjectLength(),
+            readBufferSize);
+        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
similarity index 55%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index ec74d27..528ba28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -18,24 +18,17 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -47,21 +40,48 @@ import 
org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
 import org.apache.pulsar.broker.offload.OffloadIndexBlock;
 import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
+import org.jclouds.Constants;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.BlobBuilder;
+import org.jclouds.blobstore.domain.MultipartPart;
+import org.jclouds.blobstore.domain.MultipartUpload;
+import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Location;
+import org.jclouds.domain.LocationBuilder;
+import org.jclouds.domain.LocationScope;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.jclouds.s3.reference.S3Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3ManagedLedgerOffloader implements LedgerOffloader {
-    private static final Logger log = 
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
+public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
 
-    public static final String DRIVER_NAME = "S3";
+    public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
     static final String METADATA_FORMAT_VERSION_KEY = 
"S3ManagedLedgerOffloaderFormatVersion";
     static final String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
     static final String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
     static final String CURRENT_VERSION = String.valueOf(1);
 
-    private final VersionCheck VERSION_CHECK = (key, metadata) -> {
-        String version = 
metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
+    public static boolean driverSupported(String driver) {
+        return Arrays.stream(DRIVER_NAMES).anyMatch(d -> 
d.equalsIgnoreCase(driver));
+    }
+
+    private static void addVersionInfo(BlobBuilder blobBuilder) {
+        blobBuilder.userMetadata(ImmutableMap.of(
+            METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
+            METADATA_SOFTWARE_VERSION_KEY, 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+            METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha()));
+    }
+
+    private final VersionCheck VERSION_CHECK = (key, blob) -> {
+        // NOTE all metadata in jclouds comes out as lowercase, in an effort 
to normalize the providers
+        String version = 
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
         if (version == null || !version.equals(CURRENT_VERSION)) {
             throw new IOException(String.format("Invalid object version %s for 
%s, expect %s",
                                                 version, key, 
CURRENT_VERSION));
@@ -69,15 +89,21 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     };
 
     private final OrderedScheduler scheduler;
-    private final AmazonS3 s3client;
+
+    // container in jclouds
     private final String bucket;
     // max block size for each data block.
     private int maxBlockSize;
     private final int readBufferSize;
 
-    public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
-                                                  OrderedScheduler scheduler)
+    private BlobStoreContext context;
+    private BlobStore blobStore;
+    Location location = null;
+
+    public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration 
conf,
+                                                         OrderedScheduler 
scheduler)
             throws PulsarServerException {
+        String driver = conf.getManagedLedgerOffloadDriver();
         String region = conf.getS3ManagedLedgerOffloadRegion();
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
         String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
@@ -96,23 +122,66 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
             throw new 
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less 
than 5MB");
         }
 
-        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
+        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region);
+    }
+
+    // build context for jclouds BlobStoreContext
+    BlobStoreManagedLedgerOffloader(String driver, String container, 
OrderedScheduler scheduler,
+                                    int maxBlockSize, int readBufferSize, 
String endpoint, String region) {
+        this.scheduler = scheduler;
+        this.readBufferSize = readBufferSize;
+
+        this.bucket = container;
+        this.maxBlockSize = maxBlockSize;
+
+        Properties overrides = new Properties();
+        // This property controls the number of parts being uploaded in 
parallel.
+        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
+        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
+
+        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+
+        AWSCredentials credentials = null;
+        try {
+            DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
+            credentials = creds.getCredentials();
+        } catch (Exception e) {
+            log.error("Exception when get credentials for s3 ", e);
+        }
+
+        String id = "accesskey";
+        String key = "secretkey";
+        if (credentials != null) {
+            id = credentials.getAWSAccessKeyId();
+            key = credentials.getAWSSecretKey();
+        }
+        contextBuilder.credentials(id, key);
+
         if (!Strings.isNullOrEmpty(endpoint)) {
-            builder.setEndpointConfiguration(new 
EndpointConfiguration(endpoint, region));
-            builder.setPathStyleAccessEnabled(true);
-        } else {
-            builder.setRegion(region);
+            contextBuilder.endpoint(endpoint);
+            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+        }
+        if (!Strings.isNullOrEmpty(region)) {
+            this.location = new 
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
         }
-        return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize, readBufferSize);
+
+        log.info("Constructor driver: {}, host: {}, container: {}, region: {} 
",  driver, endpoint, bucket, region);
+
+        contextBuilder.overrides(overrides);
+        this.context = contextBuilder.buildView(BlobStoreContext.class);
+        this.blobStore = context.getBlobStore();
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
OrderedScheduler scheduler,
-                             int maxBlockSize, int readBufferSize) {
-        this.s3client = s3client;
-        this.bucket = bucket;
+    // build context for jclouds BlobStoreContext
+    BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
+                                    int maxBlockSize, int readBufferSize) {
         this.scheduler = scheduler;
-        this.maxBlockSize = maxBlockSize;
         this.readBufferSize = readBufferSize;
+        this.bucket = container;
+        this.maxBlockSize = maxBlockSize;
+        this.blobStore = blobStore;
     }
 
     static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -141,15 +210,15 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), 
uuid);
 
-            ObjectMetadata dataMetadata = new ObjectMetadata();
-            addVersionInfo(dataMetadata);
-
-            InitiateMultipartUploadRequest dataBlockReq = new 
InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata);
-            InitiateMultipartUploadResult dataBlockRes = null;
+            MultipartUpload mpu = null;
+            List<MultipartPart> parts = Lists.newArrayList();
 
             // init multi part upload for data block.
             try {
-                dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq);
+                BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+                addVersionInfo(blobBuilder);
+                Blob blob = blobBuilder.build();
+                mpu = blobStore.initiateMultipartUpload(bucket, 
blob.getMetadata(), new PutOptions());
             } catch (Throwable t) {
                 promise.completeExceptionally(t);
                 return;
@@ -161,7 +230,6 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                 long startEntry = 0;
                 int partId = 1;
                 long entryBytesWritten = 0;
-                List<PartETag> etags = new LinkedList<>();
                 while (startEntry <= readHandle.getLastAddConfirmed()) {
                     int blockSize = BlockAwareSegmentInputStreamImpl
                         .calculateBlockSize(maxBlockSize, readHandle, 
startEntry, entryBytesWritten);
@@ -169,15 +237,13 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                     try (BlockAwareSegmentInputStream blockStream = new 
BlockAwareSegmentInputStreamImpl(
                         readHandle, startEntry, blockSize)) {
 
-                        UploadPartResult uploadRes = s3client.uploadPart(
-                            new UploadPartRequest()
-                                .withBucketName(bucket)
-                                .withKey(dataBlockKey)
-                                .withUploadId(dataBlockRes.getUploadId())
-                                .withInputStream(blockStream)
-                                .withPartSize(blockSize)
-                                .withPartNumber(partId));
-                        etags.add(uploadRes.getPartETag());
+                        Payload partPayload = 
Payloads.newInputStreamPayload(blockStream);
+                        
partPayload.getContentMetadata().setContentLength((long)blockSize);
+                        
partPayload.getContentMetadata().setContentType("application/octet-stream");
+                        parts.add(blobStore.uploadMultipartPart(mpu, partId, 
partPayload));
+                        log.debug("UploadMultipartPart. container: {}, 
blobName: {}, partId: {}, mpu: {}",
+                            bucket, dataBlockKey, partId, mpu.id());
+
                         indexBuilder.addBlock(startEntry, partId, blockSize);
 
                         if (blockStream.getEndEntryId() != -1) {
@@ -193,17 +259,16 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                     dataObjectLength += blockSize;
                 }
 
-                s3client.completeMultipartUpload(new 
CompleteMultipartUploadRequest()
-                    .withBucketName(bucket).withKey(dataBlockKey)
-                    .withUploadId(dataBlockRes.getUploadId())
-                    .withPartETags(etags));
+                blobStore.completeMultipartUpload(mpu, parts);
+                mpu = null;
             } catch (Throwable t) {
                 try {
-                    s3client.abortMultipartUpload(
-                        new AbortMultipartUploadRequest(bucket, dataBlockKey, 
dataBlockRes.getUploadId()));
+                    if (mpu != null) {
+                        blobStore.abortMultipartUpload(mpu);
+                    }
                 } catch (Throwable throwable) {
                     log.error("Failed abortMultipartUpload in bucket - {} with 
key - {}, uploadId - {}.",
-                        bucket, dataBlockKey, dataBlockRes.getUploadId(), 
throwable);
+                        bucket, dataBlockKey, mpu.id(), throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -213,19 +278,22 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
             try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
                  OffloadIndexBlock.IndexInputStream indexStream = 
index.toStream()) {
                 // write the index block
-                ObjectMetadata metadata = new ObjectMetadata();
-                metadata.setContentLength(indexStream.getStreamSize());
-                addVersionInfo(metadata);
-
-                s3client.putObject(new PutObjectRequest(
-                    bucket,
-                    indexBlockKey,
-                    indexStream,
-                    metadata));
+                BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+                addVersionInfo(blobBuilder);
+                Payload indexPayload = 
Payloads.newInputStreamPayload(indexStream);
+                
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
+                
indexPayload.getContentMetadata().setContentType("application/octet-stream");
+
+                Blob blob = blobBuilder
+                    .payload(indexPayload)
+                    .contentLength((long)indexStream.getStreamSize())
+                    .build();
+
+                blobStore.putBlob(bucket, blob);
                 promise.complete(null);
             } catch (Throwable t) {
                 try {
-                    s3client.deleteObject(bucket, dataBlockKey);
+                    blobStore.removeBlob(bucket, dataBlockKey);
                 } catch (Throwable throwable) {
                     log.error("Failed deleteObject in bucket - {} with key - 
{}.",
                         bucket, dataBlockKey, throwable);
@@ -244,35 +312,31 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
-                    
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                 s3client,
+                    
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                                                                 blobStore,
                                                                  bucket, key, 
indexKey,
                                                                  VERSION_CHECK,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
+                    log.error("Failed readOffloaded: ", t);
                     promise.completeExceptionally(t);
                 }
             });
         return promise;
     }
 
-    private static void addVersionInfo(ObjectMetadata metadata) {
-        metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, 
CURRENT_VERSION);
-        metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY,
-                                       
PulsarBrokerVersionStringUtils.getNormalizedVersionString());
-        metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha());
-    }
+
 
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-                s3client.deleteObjects(new DeleteObjectsRequest(bucket)
-                    .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
+                blobStore.removeBlobs(bucket,
+                    ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
-                log.error("Failed delete s3 Object ", t);
+                log.error("Failed delete Blob", t);
                 promise.completeExceptionally(t);
             }
         });
@@ -281,7 +345,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     }
 
     public interface VersionCheck {
-        void check(String key, ObjectMetadata md) throws IOException;
+        void check(String key, Blob blob) throws IOException;
     }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
similarity index 61%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
index 45bca52..bde4dde 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
@@ -18,29 +18,35 @@
  */
 package org.apache.pulsar.broker.offload;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.spy;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-
 import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl;
-
+import org.apache.pulsar.broker.offload.impl.BlobStoreBackedInputStreamImpl;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
-class S3BackedInputStreamTest extends S3TestBase {
+class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class);
+
     class RandomInputStream extends InputStream {
         final Random r;
         int bytesRemaining;
@@ -85,16 +91,21 @@ class S3BackedInputStreamTest extends S3TestBase {
 
     @Test
     public void testReadingFullObject() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testReadingFull";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
         RandomInputStream toCompare = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         assertStreamsMatch(toTest, toCompare);
@@ -102,24 +113,29 @@ class S3BackedInputStreamTest extends S3TestBase {
 
     @Test
     public void testReadingFullObjectByBytes() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testReadingFull2";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
         RandomInputStream toCompare = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         assertStreamsMatchByBytes(toTest, toCompare);
     }
 
     @Test(expectedExceptions = IOException.class)
-    public void testErrorOnS3Read() throws Exception {
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, "doesn't exist",
+    public void testErrorOnRead() throws Exception {
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist",
                                                                  (key, md) -> 
{},
                                                                  1234, 1000);
         toTest.read();
@@ -128,7 +144,7 @@ class S3BackedInputStreamTest extends S3TestBase {
 
     @Test
     public void testSeek() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeek";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
@@ -141,11 +157,16 @@ class S3BackedInputStreamTest extends S3TestBase {
             seeks.put(seek, stream);
         }
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
@@ -156,16 +177,23 @@ class S3BackedInputStreamTest extends S3TestBase {
 
     @Test
     public void testSeekWithinCurrent() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeekWithinCurrent";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
+
+        //BlobStore spiedBlobStore = spy(blobStore);
+        BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
 
-        AmazonS3 spiedClient = spy(s3client);
-        BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
 
@@ -193,20 +221,26 @@ class S3BackedInputStreamTest extends S3TestBase {
             Assert.assertEquals(thirdSeek.read(), toTest.read());
         }
 
-        verify(spiedClient, times(1)).getObject(anyObject());
+        verify(spiedBlobStore, times(1))
+            .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), 
Matchers.<GetOptions>anyObject());
     }
 
     @Test
     public void testSeekForward() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeekForward";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
similarity index 53%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
index f2ea6c4..d1474f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
@@ -18,27 +18,39 @@
  */
 package org.apache.pulsar.broker.offload;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 
-public class S3TestBase {
+public class BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreTestBase.class);
+
     public final static String BUCKET = "pulsar-unittest";
 
-    protected AmazonS3 s3client = null;
+    protected BlobStoreContext context = null;
+    protected BlobStore blobStore = null;
 
     @BeforeMethod
     public void start() throws Exception {
-        if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
-            // To use this, ~/.aws must be configured with credentials and a 
default region
-            s3client = AmazonS3ClientBuilder.standard().build();
-        } else {
-            s3client = new S3Mock();
+        context = 
ContextBuilder.newBuilder("transient").build(BlobStoreContext.class);
+        blobStore = context.getBlobStore();
+        boolean create = blobStore.createContainerInLocation(null, BUCKET);
+
+        log.debug("TestBase Create Bucket: {}, in blobStore, result: {}", 
BUCKET, create);
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        if (blobStore != null) {
+            blobStore.deleteContainer(BUCKET);
         }
 
-        if (!s3client.doesBucketExistV2(BUCKET)) {
-            s3client.createBucket(BUCKET);
+        if (context != null) {
+            context.close();
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
deleted file mode 100644
index 4bfc140..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.offload;
-
-import com.amazonaws.services.s3.AbstractAmazonS3;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.Bucket;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectResult;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
-import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-
-import com.google.common.collect.ComparisonChain;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.stream.Collectors;
-
-/**
- * Minimal mock for amazon client.
- * If making any changes, validate they behave the same as S3 by running all 
S3 tests with -DtestRealAWS=true
- */
-class S3Mock extends AbstractAmazonS3 {
-    @Override
-    public boolean doesBucketExistV2(String bucketName) {
-        return buckets.containsKey(bucketName);
-    }
-
-    @Override
-    public boolean doesObjectExist(String bucketName, String objectName) {
-        return buckets.containsKey(bucketName) && 
getBucket(bucketName).hasObject(objectName);
-    }
-
-    @Override
-    public Bucket createBucket(String bucketName) {
-        return buckets.computeIfAbsent(bucketName, (k) -> new MockBucket(k));
-    }
-
-    private MockBucket getBucket(String bucketName) throws AmazonS3Exception {
-        MockBucket bucket = buckets.get(bucketName);
-        if (bucket != null) {
-            return bucket;
-        } else {
-            throw new AmazonS3Exception("NoSuchBucket: Bucket doesn't exist");
-        }
-    }
-
-    @Override
-    public PutObjectResult putObject(PutObjectRequest putObjectRequest)
-            throws AmazonS3Exception {
-        return 
getBucket(putObjectRequest.getBucketName()).putObject(putObjectRequest);
-    }
-
-    @Override
-    public S3Object getObject(GetObjectRequest getObjectRequest) {
-        return 
getBucket(getObjectRequest.getBucketName()).getObject(getObjectRequest);
-    }
-
-    @Override
-    public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest 
getObjectMetadataRequest)
-            throws AmazonS3Exception {
-        return 
getBucket(getObjectMetadataRequest.getBucketName()).getObjectMetadata(getObjectMetadataRequest);
-    }
-
-    @Override
-    public void deleteObject(DeleteObjectRequest deleteObjectRequest)
-            throws AmazonS3Exception {
-        
getBucket(deleteObjectRequest.getBucketName()).deleteObject(deleteObjectRequest.getKey());
-    }
-
-    @Override
-    public DeleteObjectsResult deleteObjects(DeleteObjectsRequest 
deleteObjectsRequest)
-            throws AmazonS3Exception {
-        List<DeleteObjectsResult.DeletedObject> results = 
deleteObjectsRequest.getKeys().stream().map((k) -> {
-                
getBucket(deleteObjectsRequest.getBucketName()).deleteObject(k.getKey());
-                DeleteObjectsResult.DeletedObject res = new 
DeleteObjectsResult.DeletedObject();
-                res.setKey(k.getKey());
-                return res;
-            }).collect(Collectors.toList());
-        return new DeleteObjectsResult(results);
-    }
-
-    @Override
-    public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
-            throws AmazonS3Exception {
-        S3Object from = getObject(new 
GetObjectRequest(copyObjectRequest.getSourceBucketName(),
-                                                       
copyObjectRequest.getSourceKey()));
-        ObjectMetadata newMetadata = copyObjectRequest.getNewObjectMetadata();
-        if (newMetadata == null) {
-            newMetadata = from.getObjectMetadata();
-        }
-        
newMetadata.setContentLength(from.getObjectMetadata().getContentLength());
-        putObject(new 
PutObjectRequest(copyObjectRequest.getDestinationBucketName(),
-                                       copyObjectRequest.getDestinationKey(),
-                                       from.getObjectContent(),
-                                       newMetadata));
-        return new CopyObjectResult();
-    }
-
-    @Override
-    public InitiateMultipartUploadResult 
initiateMultipartUpload(InitiateMultipartUploadRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).initMultipart(request);
-    }
-
-    @Override
-    public UploadPartResult uploadPart(UploadPartRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).uploadPart(request);
-    }
-
-    @Override
-    public CompleteMultipartUploadResult 
completeMultipartUpload(CompleteMultipartUploadRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).completeMultipart(request);
-    }
-
-    ConcurrentHashMap<String, MockBucket> buckets = new ConcurrentHashMap<>();
-
-    static class MockBucket extends Bucket {
-        ConcurrentHashMap<String, MockObject> objects = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, MockMultipart> inprogressMultipart = new 
ConcurrentHashMap<>();
-
-        MockBucket(String name) {
-            super(name);
-        }
-
-        boolean hasObject(String key) {
-            return objects.containsKey(key);
-        }
-
-        PutObjectResult putObject(PutObjectRequest putObjectRequest) throws 
AmazonS3Exception {
-            byte[] bytes = streamToBytes(putObjectRequest.getInputStream(),
-                                         
(int)putObjectRequest.getMetadata().getContentLength());
-            objects.put(putObjectRequest.getKey(),
-                        new MockObject(putObjectRequest.getMetadata(), bytes));
-            return new PutObjectResult();
-        }
-
-        S3Object getObject(GetObjectRequest getObjectRequest) throws 
AmazonS3Exception {
-            MockObject obj = objects.get(getObjectRequest.getKey());
-            if (obj == null) {
-                throw new AmazonS3Exception("Object doesn't exist");
-            }
-
-            S3Object s3obj = new S3Object();
-            s3obj.setBucketName(getObjectRequest.getBucketName());
-            s3obj.setKey(getObjectRequest.getKey());
-
-            if (getObjectRequest.getRange() != null) {
-                long[] range = getObjectRequest.getRange();
-                int size = (int)(range[1] - range[0] + 1);
-                ObjectMetadata metadata = obj.metadata.clone();
-                metadata.setHeader("Content-Range",
-                                   String.format("bytes %d-%d/%d",
-                                                 range[0], range[1], size));
-                s3obj.setObjectMetadata(metadata);
-                s3obj.setObjectContent(new ByteArrayInputStream(obj.data, 
(int)range[0], size));
-                return s3obj;
-            } else {
-                s3obj.setObjectMetadata(obj.metadata);
-                s3obj.setObjectContent(new ByteArrayInputStream(obj.data));
-                return s3obj;
-            }
-        }
-
-        void deleteObject(String key) {
-            objects.remove(key);
-        }
-
-        ObjectMetadata getObjectMetadata(GetObjectMetadataRequest 
getObjectMetadataRequest)
-                throws AmazonS3Exception {
-            MockObject obj = objects.get(getObjectMetadataRequest.getKey());
-            if (obj == null) {
-                throw new AmazonS3Exception("Object doesn't exist");
-            }
-            return obj.metadata;
-        }
-
-        InitiateMultipartUploadResult 
initMultipart(InitiateMultipartUploadRequest request)
-                throws AmazonS3Exception {
-            String uploadId = UUID.randomUUID().toString();
-            inprogressMultipart.put(uploadId, new 
MockMultipart(request.getKey(),
-                                                                
request.getObjectMetadata()));
-            InitiateMultipartUploadResult result = new 
InitiateMultipartUploadResult();
-            result.setBucketName(request.getBucketName());
-            result.setKey(request.getKey());
-            result.setUploadId(uploadId);
-            return result;
-        }
-
-        MockMultipart getMultipart(String uploadId, String key) throws 
AmazonS3Exception {
-            MockMultipart multi = inprogressMultipart.get(uploadId);
-            if (multi == null) {
-                throw new AmazonS3Exception("No such upload " + uploadId);
-            }
-            if (!multi.key.equals(key)) {
-                throw new AmazonS3Exception("Wrong key for upload " + uploadId
-                                            + ", expected " + key
-                                            + ", got " + multi.key);
-            }
-            return multi;
-        }
-
-        UploadPartResult uploadPart(UploadPartRequest request)
-                throws AmazonS3Exception {
-            MockMultipart multi = getMultipart(request.getUploadId(), 
request.getKey());
-            byte[] bytes = streamToBytes(request.getInputStream(), 
(int)request.getPartSize());
-            UploadPartResult result = new UploadPartResult();
-            result.setPartNumber(request.getPartNumber());
-            result.setETag(multi.addPart(request.getPartNumber(), bytes));
-            return result;
-        }
-
-        CompleteMultipartUploadResult 
completeMultipart(CompleteMultipartUploadRequest request)
-                throws AmazonS3Exception {
-            MockMultipart multi = getMultipart(request.getUploadId(), 
request.getKey());
-            inprogressMultipart.remove(request.getUploadId());
-            objects.put(request.getKey(), 
multi.complete(request.getPartETags()));
-            CompleteMultipartUploadResult result = new 
CompleteMultipartUploadResult();
-            result.setBucketName(request.getBucketName());
-            result.setKey(request.getKey());
-            return result;
-        }
-    }
-
-    private static byte[] streamToBytes(InputStream data, int length) throws 
AmazonS3Exception {
-        byte[] bytes = new byte[length];
-        try {
-            for (int i = 0; i < length; i++) {
-                bytes[i] = (byte)data.read();
-            }
-        } catch (IOException ioe) {
-            throw new AmazonS3Exception("Error loading data", ioe);
-        }
-        return bytes;
-    }
-
-    static class MockObject {
-        final ObjectMetadata metadata;
-        final byte[] data;
-        final Map<Integer, long[]> partRanges;
-
-
-        MockObject(ObjectMetadata metadata, byte[] data) {
-            this(metadata, data, null);
-        }
-
-        MockObject(ObjectMetadata metadata, byte[] data, Map<Integer, long[]> 
partRanges) {
-            this.metadata = metadata;
-            this.data = data;
-            this.partRanges = partRanges;
-        }
-
-    }
-
-    static class MockMultipart {
-        final String key;
-        final ObjectMetadata metadata;
-        final ConcurrentSkipListMap<PartETag, byte[]> parts = new 
ConcurrentSkipListMap<>(
-                (etag1, etag2) -> 
ComparisonChain.start().compare(etag1.getPartNumber(),
-                                                                  
etag2.getPartNumber()).result());
-
-        MockMultipart(String key, ObjectMetadata metadata) {
-            this.key = key;
-            this.metadata = metadata;
-        }
-
-        String addPart(int partNumber, byte[] bytes) {
-            String etag = UUID.randomUUID().toString();
-            parts.put(new PartETag(partNumber, etag), bytes);
-            return etag;
-        }
-
-        MockObject complete(List<PartETag> tags) throws AmazonS3Exception {
-            if (parts.size() != tags.size()
-                || !parts.keySet().containsAll(tags)) {
-                throw new AmazonS3Exception("Tags don't match uploaded parts");
-            }
-
-            int totalSize = parts.values().stream().map(v -> 
v.length).reduce(0, (acc, v) -> acc + v);
-            byte[] full = new byte[totalSize];
-
-            Map<Integer, long[]> partRanges = new HashMap<>();
-            int start = 0;
-            for (Map.Entry<PartETag, byte[]> e : parts.entrySet()) {
-                int partLength = e.getValue().length;
-                System.arraycopy(e.getValue(), 0, full, start, partLength);
-                partRanges.put(e.getKey().getPartNumber(),
-                               new long[] { start, start + partLength - 1 });
-                start += partLength;
-            }
-            metadata.setContentLength(totalSize);
-            return new MockObject(metadata, full, partRanges);
-        }
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
similarity index 71%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 7b9d9a2..19463d2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -18,24 +18,23 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import static 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey;
-import static 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey;
+import static 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey;
+import static 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey;
+import static org.mockito.AdditionalAnswers.delegatesTo;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import java.lang.reflect.Method;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -51,21 +50,25 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.offload.S3TestBase;
-import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.BlobStoreTestBase;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.options.CopyOptions;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
-class S3ManagedLedgerOffloaderTest extends S3TestBase {
+class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
     final OrderedScheduler scheduler;
     final MockBookKeeper bk;
 
-    S3ManagedLedgerOffloaderTest() throws Exception {
+    BlobStoreManagedLedgerOffloaderTest() throws Exception {
         scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
         bk = new 
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
     }
@@ -114,31 +117,32 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
 
     @Test
     public void testHappyCase() throws Exception {
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         offloader.offload(buildReadHandle(), UUID.randomUUID(), new 
HashMap<>()).get();
     }
 
     @Test
     public void testBucketDoesNotExist() throws Exception {
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
"no-bucket", scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, "no-bucket", scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         try {
             offloader.offload(buildReadHandle(), UUID.randomUUID(), new 
HashMap<>()).get();
             Assert.fail("Shouldn't be able to add to bucket");
         } catch (ExecutionException e) {
-            Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
+            log.error("Exception: ", e.getMessage());
+            Assert.assertTrue(e.getMessage().contains("not found"));
         }
     }
 
     @Test
     public void testNoRegionConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -148,11 +152,11 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     @Test
     public void testNoBucketConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -162,13 +166,13 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     @Test
     public void testSmallBlockSizeConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
         conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(1024);
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -178,7 +182,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     @Test
     public void testOffloadAndRead() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -213,21 +217,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
 
         // mock throw exception when initiateMultipartUpload
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).initiateMultipartUpload(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).initiateMultipartUpload(any(), any(), 
any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
                                                                      
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception when initiateMultipartUpload");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -239,22 +244,21 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
 
         // mock throw exception when uploadPart
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException("fail 
DataBlockPartUpload"))
-                .when(mockS3client).uploadPart(any());
-            Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).uploadMultipartPart(any(), anyInt(), 
any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception for when uploadPart");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -266,22 +270,25 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
 
         // mock throw exception when completeMultipartUpload
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
+            Mockito
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).completeMultipartUpload(any(), any());
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).completeMultipartUpload(any());
-            Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+                .doNothing()
+                .when(spiedBlobStore).abortMultipartUpload(any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
             Assert.fail("Should throw exception for when 
completeMultipartUpload");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -293,21 +300,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
 
         // mock throw exception when putObject
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).putObject(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).putBlob(any(), any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
             Assert.fail("Should throw exception for when putObject for index 
block");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -328,7 +336,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
             randomAccesses[i][1] = second;
         }
 
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -360,7 +368,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     @Test
     public void testOffloadReadInvalidEntryIds() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -385,46 +393,47 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     public void testDeleteOffloaded() throws Exception {
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
 
         // verify object exist after offload
         offloader.offload(readHandle, uuid, new HashMap<>()).get();
-        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
 
         // verify object deleted after delete
         offloader.deleteOffloaded(readHandle.getId(), uuid).get();
-        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
     }
 
     @Test
     public void testDeleteOffloadedFail() throws Exception {
+        String failureString = "fail deleteOffloaded";
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
-                                                                 
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
-        String failureString = "fail deleteOffloaded";
-        AmazonS3 mockS3client = Mockito.spy(s3client);
+        BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
+
         Mockito
-            .doThrow(new AmazonServiceException(failureString))
-            .when(mockS3client).deleteObjects(any());
+            .doThrow(new RuntimeException(failureString))
+            .when(spiedBlobStore).removeBlobs(any(), any());
+
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+            DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
 
         try {
             // verify object exist after offload
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
 
             offloader.deleteOffloaded(readHandle.getId(), uuid).get();
         } catch (Exception e) {
             // expected
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
             // verify object still there.
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -441,7 +450,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
         Mockito.doReturn(1234L).when(readHandle).getId();
 
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         try {
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
@@ -454,48 +463,51 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
     @Test
     public void testReadUnknownDataVersion() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
         String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid);
-        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey);
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, 
dataKey).withNewObjectMetadata(md));
+
+        Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, 
dataKey).getUserMetadata();
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
+        blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
+            log.error("Exception: ", e);
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
-            Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
+            Assert.assertTrue(e.getCause().getMessage().contains("Error 
reading from BlobStore"));
         }
 
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, 
dataKey).withNewObjectMetadata(md));
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
+        blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
-            Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
+            Assert.assertTrue(e.getCause().getMessage().contains("Error 
reading from BlobStore"));
         }
     }
 
     @Test
     public void testReadUnknownIndexVersion() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
         String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid);
-        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey);
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, 
indexKey).withNewObjectMetadata(md));
+
+        Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, 
indexKey).getUserMetadata();
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
+        blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
             offloader.readOffloaded(toWrite.getId(), uuid).get();
@@ -505,8 +517,8 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
         }
 
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, 
indexKey).withNewObjectMetadata(md));
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
+        blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
             offloader.readOffloaded(toWrite.getId(), uuid).get();
diff --git 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
index 6b8eaa4..ec46571 100644
--- 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
+++ 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
@@ -148,7 +148,9 @@ pulsar-proxy*:
   networkMode: pulsarnet*
 
 s3*:
-  image: adobe/s3mock
+  ## use latest adobe/s3mock, for issue: 
https://github.com/adobe/S3Mock/issues/32
+  ## TODO: https://github.com/apache/incubator-pulsar/issues/2133
+  image: apachepulsar/s3mock
   await:
     strategy: org.apache.pulsar.tests.NoopAwaitStrategy
   env:

Reply via email to