sijie closed pull request #2398:  [tiered storage] store driver name and driver 
specific metadata in original ledger metadata
URL: https://github.com/apache/incubator-pulsar/pull/2398
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 6885500bb8..8fc35cc0b6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -20,6 +20,7 @@
 
 import com.google.common.annotations.Beta;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,25 @@
  */
 @Beta
 public interface LedgerOffloader {
+
+    /**
+     * Get offload driver name.
+     *
+     * @return offload driver name.
+     */
+    String getOffloadDriverName();
+
+    /**
+     * Get offload driver metadata.
+     *
+     * <p>The driver metadata will be recorded as part of the metadata of the 
original ledger.
+     *
+     * @return offload driver metadata.
+     */
+    default Map<String, String> getOffloadDriverMetadata() {
+        return Collections.emptyMap();
+    }
+
     /**
      * Offload the passed in ledger to longterm storage.
      * Metadata passed in is for inspection purposes only and should be stored
@@ -51,10 +71,9 @@
      *
      * @param ledger the ledger to offload
      * @param uid unique id to identity this offload attempt
-     * @param extraMetadata metadata to be stored with the ledger for 
informational
+     * @param extraMetadata metadata to be stored with the offloaded ledger 
for informational
      *                      purposes
-     * @return a future, which when completed, denotes that the offload has 
been
-     *         successful
+     * @return a future, which when completed, denotes that the offload has 
been successful.
      */
     CompletableFuture<Void> offload(ReadHandle ledger,
                                     UUID uid,
@@ -69,9 +88,11 @@
      *
      * @param ledgerId the ID of the ledger to load from longterm storage
      * @param uid unique ID for previous successful offload attempt
+     * @param offloadDriverMetadata offload driver metadata
      * @return a future, which when completed, returns a ReadHandle
      */
-    CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid);
+    CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                Map<String, String> 
offloadDriverMetadata);
 
     /**
      * Delete a ledger from long term storage.
@@ -81,9 +102,11 @@
      *
      * @param ledgerId the ID of the ledger to delete from longterm storage
      * @param uid unique ID for previous offload attempt
+     * @param offloadDriverMetadata offload driver metadata
      * @return a future, which when completed, signifies that the ledger has
      *         been deleted
      */
-    CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid);
+    CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                            Map<String, String> 
offloadDriverMetadata);
 }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 388cdef84c..6e4a6090c5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -96,6 +96,7 @@
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
@@ -1390,7 +1391,9 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
                 if (info != null && info.hasOffloadContext() && 
info.getOffloadContext().getComplete()) {
                     UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
                                         info.getOffloadContext().getUidLsb());
-                    openFuture = 
config.getLedgerOffloader().readOffloaded(ledgerId, uid);
+                    // TODO: improve this to load ledger offloader by driver 
name recorded in metadata
+                    openFuture = config.getLedgerOffloader()
+                        .readOffloaded(ledgerId, uid, 
OffloadUtils.getOffloadDriverMetadata(info));
                 } else {
                     openFuture = bookKeeper.newOpenLedgerOp()
                         .withRecovery(!isReadOnly())
@@ -1771,7 +1774,16 @@ void internalTrimConsumedLedgers(CompletableFuture<?> 
promise) {
             }
             for (LedgerInfo ls : offloadedLedgersToDelete) {
                 LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                newInfoBuilder.getOffloadContextBuilder()
+                    .setBookkeeperDeleted(true);
+                String driverName = OffloadUtils.getOffloadDriverName(
+                    ls, config.getLedgerOffloader().getOffloadDriverName());
+                Map<String, String> driverMetadata = 
OffloadUtils.getOffloadDriverMetadata(
+                    ls, 
config.getLedgerOffloader().getOffloadDriverMetadata());
+                OffloadUtils.setOffloadDriverMetadata(
+                    newInfoBuilder,
+                    driverName, driverMetadata
+                );
                 ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
             }
 
@@ -1903,7 +1915,11 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo 
info) {
         if (info.getOffloadContext().hasUidMsb()) {
             UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
                                  info.getOffloadContext().getUidLsb());
-            cleanupOffloaded(ledgerId, uuid, "Trimming");
+            cleanupOffloaded(
+                ledgerId, uuid,
+                OffloadUtils.getOffloadDriverName(info, 
config.getLedgerOffloader().getOffloadDriverName()),
+                OffloadUtils.getOffloadDriverMetadata(info, 
config.getLedgerOffloader().getOffloadDriverMetadata()),
+                "Trimming");
         }
     }
 
@@ -2105,7 +2121,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> 
promise, Queue<LedgerIn
             UUID uuid = UUID.randomUUID();
             Map<String, String> extraMetadata = 
ImmutableMap.of("ManagedLedgerName", name);
 
-            prepareLedgerInfoForOffloaded(ledgerId, uuid)
+            String driverName = 
config.getLedgerOffloader().getOffloadDriverName();
+            Map<String, String> driverMetadata = 
config.getLedgerOffloader().getOffloadDriverMetadata();
+
+            prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, 
driverMetadata)
                 .thenCompose((ignore) -> getLedgerHandle(ledgerId))
                 .thenCompose(readHandle -> 
config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
                 .thenCompose((ignore) -> {
@@ -2116,7 +2135,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> 
promise, Queue<LedgerIn
                                            scheduledExecutor, name)
                             .whenComplete((ignore2, exception) -> {
                                     if (exception != null) {
-                                        cleanupOffloaded(ledgerId, uuid, 
"Metastore failure");
+                                        cleanupOffloaded(
+                                            ledgerId, uuid,
+                                            driverName, driverMetadata,
+                                            "Metastore failure");
                                     }
                                 });
                     })
@@ -2216,7 +2238,10 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
-    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long 
ledgerId, UUID uuid) {
+    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long 
ledgerId,
+                                                                  UUID uuid,
+                                                                  String 
offloadDriverName,
+                                                                  Map<String, 
String> offloadDriverMetadata) {
         log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", 
name, ledgerId, uuid);
         return transformLedgerInfo(ledgerId,
                                    (oldInfo) -> {
@@ -2225,12 +2250,24 @@ public void operationFailed(MetaStoreException e) {
                                                                    
oldInfo.getOffloadContext().getUidLsb());
                                            log.info("[{}] Found previous 
offload attempt for ledger {}, uuid {}"
                                                     + ", cleaning up", name, 
ledgerId, uuid);
-                                           cleanupOffloaded(ledgerId, oldUuid, 
"Previous failed offload");
+                                           cleanupOffloaded(
+                                               ledgerId,
+                                               oldUuid,
+                                               
OffloadUtils.getOffloadDriverName(oldInfo,
+                                                   
config.getLedgerOffloader().getOffloadDriverName()),
+                                               
OffloadUtils.getOffloadDriverMetadata(oldInfo,
+                                                   
config.getLedgerOffloader().getOffloadDriverMetadata()),
+                                               "Previous failed offload");
                                        }
                                        LedgerInfo.Builder builder = 
oldInfo.toBuilder();
                                        builder.getOffloadContextBuilder()
                                            
.setUidMsb(uuid.getMostSignificantBits())
                                            
.setUidLsb(uuid.getLeastSignificantBits());
+                                       OffloadUtils.setOffloadDriverMetadata(
+                                           builder,
+                                           offloadDriverName,
+                                           offloadDriverMetadata
+                                       );
                                        return builder.build();
                                    })
             .whenComplete((result, exception) -> {
@@ -2254,6 +2291,16 @@ public void operationFailed(MetaStoreException e) {
                                            builder.getOffloadContextBuilder()
                                                .setTimestamp(clock.millis())
                                                .setComplete(true);
+
+                                           String driverName = 
OffloadUtils.getOffloadDriverName(
+                                               oldInfo, 
config.getLedgerOffloader().getOffloadDriverName());
+                                           Map<String, String> driverMetadata 
= OffloadUtils.getOffloadDriverMetadata(
+                                               oldInfo, 
config.getLedgerOffloader().getOffloadDriverMetadata());
+                                           
OffloadUtils.setOffloadDriverMetadata(
+                                               builder,
+                                               driverName,
+                                               driverMetadata
+                                           );
                                            return builder.build();
                                        } else {
                                            throw new OffloadConflict(
@@ -2272,10 +2319,14 @@ public void operationFailed(MetaStoreException e) {
                 });
     }
 
-    private void cleanupOffloaded(long ledgerId, UUID uuid, String 
cleanupReason) {
+    private void cleanupOffloaded(long ledgerId,
+                                  UUID uuid,
+                                  String offloadDriverName, /* TODO: use 
driver name to identify offloader */
+                                  Map<String, String> offloadDriverMetadata,
+                                  String cleanupReason) {
         Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), 
TimeUnit.SECONDS.toHours(1)).limit(10),
                     Retries.NonFatalPredicate,
-                    () -> 
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
+                    () -> 
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, 
offloadDriverMetadata),
                     scheduledExecutor, name)
             .whenComplete((ignored, exception) -> {
                     if (exception != null) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index cd76a1b374..3401f1bbbb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -31,6 +31,11 @@
 public class NullLedgerOffloader implements LedgerOffloader {
     public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();
 
+    @Override
+    public String getOffloadDriverName() {
+        return "NullLedgerOffloader";
+    }
+
     @Override
     public CompletableFuture<Void> offload(ReadHandle ledger,
                                            UUID uid,
@@ -41,14 +46,16 @@
     }
 
     @Override
-    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uid) {
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         promise.completeExceptionally(new UnsupportedOperationException());
         return promise;
     }
 
     @Override
-    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                                   Map<String, String> 
offloadDriverMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         promise.completeExceptionally(new UnsupportedOperationException());
         return promise;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
new file mode 100644
index 0000000000..44ebc80320
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
+
+public final class OffloadUtils {
+
+    private OffloadUtils() {}
+
+    public static Map<String, String> getOffloadDriverMetadata(LedgerInfo 
ledgerInfo) {
+        Map<String, String> metadata = Maps.newHashMap();
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.getPropertiesCount() > 0) {
+                    driverMetadata.getPropertiesList().forEach(kv -> 
metadata.put(kv.getKey(), kv.getValue()));
+                }
+            }
+        }
+        return metadata;
+    }
+
+    public static Map<String, String> getOffloadDriverMetadata(LedgerInfo 
ledgerInfo,
+                                                               Map<String, 
String> defaultOffloadDriverMetadata) {
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.getPropertiesCount() > 0) {
+                    Map<String, String> metadata = Maps.newHashMap();
+                    driverMetadata.getPropertiesList().forEach(kv -> 
metadata.put(kv.getKey(), kv.getValue()));
+                    return metadata;
+                }
+            }
+        }
+        return defaultOffloadDriverMetadata;
+    }
+
+    public static String getOffloadDriverName(LedgerInfo ledgerInfo, String 
defaultDriverName) {
+        if (ledgerInfo.hasOffloadContext()) {
+            OffloadContext ctx = ledgerInfo.getOffloadContext();
+            if (ctx.hasDriverMetadata()) {
+                OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+                if (driverMetadata.hasName()) {
+                    return driverMetadata.getName();
+                }
+            }
+        }
+        return defaultDriverName;
+    }
+
+    public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder,
+                                                String driverName,
+                                                Map<String, String> 
offloadDriverMetadata) {
+        infoBuilder.getOffloadContextBuilder()
+            .getDriverMetadataBuilder()
+            .setName(driverName);
+        offloadDriverMetadata.forEach((k, v) -> {
+            infoBuilder.getOffloadContextBuilder()
+                .getDriverMetadataBuilder()
+                .addProperties(KeyValue.newBuilder()
+                    .setKey(k)
+                    .setValue(v)
+                    .build());
+        });
+    }
+
+}
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto 
b/managed-ledger/src/main/proto/MLDataFormats.proto
index 0d5ad3a17c..4dbd23154d 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -21,12 +21,23 @@ syntax = "proto2";
 option java_package = "org.apache.bookkeeper.mledger.proto";
 option optimize_for = SPEED;
 
+message KeyValue {
+       required string key = 1;
+       required string value = 2;
+}
+
+message OffloadDriverMetadata {
+    required string name = 1;
+    repeated KeyValue properties = 2;
+}
+
 message OffloadContext {
     optional int64 uidMsb = 1;
     optional int64 uidLsb = 2;
     optional bool complete = 3;
     optional bool bookkeeperDeleted = 4;
     optional int64 timestamp = 5;
+    optional OffloadDriverMetadata driverMetadata = 6;
 }
 
 message ManagedLedgerInfo {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 8bbb44ae4b..4bf518f3a1 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
@@ -30,6 +31,7 @@
 
 import io.netty.buffer.ByteBuf;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -51,7 +53,6 @@
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 
@@ -98,24 +99,32 @@ public void testOffloadRead() throws Exception {
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(1)).readOffloaded(anyLong(), anyObject());
-        verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID));
+        verify(offloader, times(1))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
+        verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), 
anyMap());
 
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
-        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID));
+        verify(offloader, times(2))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
+        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), 
anyMap());
 
         for (Entry e : cursor.readEntries(5)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
-        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
+        verify(offloader, times(2))
+            .readOffloaded(anyLong(), anyObject(), anyMap());
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
         ConcurrentHashMap<UUID, ReadHandle> offloads = new 
ConcurrentHashMap<UUID, ReadHandle>();
 
+        @Override
+        public String getOffloadDriverName() {
+            return "mock";
+        }
+
         @Override
         public CompletableFuture<Void> offload(ReadHandle ledger,
                                                UUID uuid,
@@ -131,12 +140,14 @@ public void testOffloadRead() throws Exception {
         }
 
         @Override
-        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
+                                                           Map<String, String> 
offloadDriverMetadata) {
             return CompletableFuture.completedFuture(offloads.get(uuid));
         }
 
         @Override
-        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
             offloads.remove(uuid);
             return CompletableFuture.completedFuture(null);
         };
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 6d21ee2a7f..351d86b6c3 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -493,9 +493,10 @@ public void testOffloadConflict() throws Exception {
                 }
 
                 @Override
-                public CompletableFuture<Void> deleteOffloaded(long ledgerId, 
UUID uuid) {
+                public CompletableFuture<Void> deleteOffloaded(long ledgerId, 
UUID uuid,
+                                                               Map<String, 
String> offloadDriverMetadata) {
                     deleted.add(Pair.of(ledgerId, uuid));
-                    return super.deleteOffloaded(ledgerId, uuid);
+                    return super.deleteOffloaded(ledgerId, uuid, 
offloadDriverMetadata);
                 }
             };
         ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -928,6 +929,11 @@ public void offloadFailed(ManagedLedgerException 
exception, Object ctx) {
             return deletes.keySet();
         }
 
+        @Override
+        public String getOffloadDriverName() {
+            return "mock";
+        }
+
         @Override
         public CompletableFuture<Void> offload(ReadHandle ledger,
                                                UUID uuid,
@@ -942,14 +948,16 @@ public void offloadFailed(ManagedLedgerException 
exception, Object ctx) {
         }
 
         @Override
-        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
+                                                           Map<String, String> 
offloadDriverMetadata) {
             CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
             promise.completeExceptionally(new UnsupportedOperationException());
             return promise;
         }
 
         @Override
-        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
             CompletableFuture<Void> promise = new CompletableFuture<>();
             if (offloads.remove(ledgerId, uuid)) {
                 deletes.put(ledgerId, uuid);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index dc925a2756..63b4d84c1b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Sets;
 
@@ -78,6 +79,8 @@ public void cleanup() throws Exception {
 
     private void testOffload(String topicName, String mlName) throws Exception 
{
         LedgerOffloader offloader = mock(LedgerOffloader.class);
+        when(offloader.getOffloadDriverName()).thenReturn("mock");
+
         doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index eb4636f634..fcd33002e0 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -42,6 +42,7 @@
       <groupId>org.apache.pulsar</groupId>
       <artifactId>jclouds-shaded</artifactId>
       <version>${project.version}</version>
+        <!--
       <exclusions>
         <exclusion>
           <groupId>com.google.code.gson</groupId>
@@ -68,6 +69,7 @@
           <artifactId>*</artifactId>
         </exclusion>
       </exclusions>
+      -->
     </dependency>
     <dependency>
       <groupId>com.amazonaws</groupId>
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index e09a66b368..f96afaf363 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -36,6 +36,9 @@
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import lombok.Data;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -43,6 +46,7 @@
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.commons.lang3.tuple.Pair;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStore;
@@ -66,6 +70,10 @@
 public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
 
+    private static final String METADATA_FIELD_BUCKET = "bucket";
+    private static final String METADATA_FIELD_REGION = "region";
+    private static final String METADATA_FIELD_ENDPOINT = "endpoint";
+
     public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
     // use these keys for both s3 and gcs.
@@ -91,6 +99,42 @@ private static void addVersionInfo(BlobBuilder blobBuilder, 
Map<String, String>
         blobBuilder.userMetadata(metadataBuilder.build());
     }
 
+    @Data(staticConstructor = "of")
+    private static class BlobStoreLocation {
+        private final String region;
+        private final String endpoint;
+    }
+
+    private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String 
driver,
+                                                                      String 
region,
+                                                                      String 
endpoint,
+                                                                      
Credentials credentials,
+                                                                      int 
maxBlockSize) {
+        Properties overrides = new Properties();
+        // This property controls the number of parts being uploaded in 
parallel.
+        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
+        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
+
+        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+        contextBuilder.credentials(credentials.identity, 
credentials.credential);
+
+        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
+            contextBuilder.endpoint(endpoint);
+            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+        }
+        contextBuilder.overrides(overrides);
+        BlobStoreContext context = 
contextBuilder.buildView(BlobStoreContext.class);
+        BlobStore blobStore = context.getBlobStore();
+
+        log.info("Connect to blobstore : driver: {}, region: {}, endpoint: {}",
+            driver, region, endpoint);
+        return Pair.of(
+            BlobStoreLocation.of(region, endpoint),
+            blobStore);
+    }
+
     private final VersionCheck VERSION_CHECK = (key, blob) -> {
         // NOTE all metadata in jclouds comes out as lowercase, in an effort 
to normalize the providers
         String version = 
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
@@ -102,16 +146,28 @@ private static void addVersionInfo(BlobBuilder 
blobBuilder, Map<String, String>
 
     private final OrderedScheduler scheduler;
 
-    // container in jclouds
-    private final String bucket;
+    // container in jclouds to write offloaded ledgers
+    private final String writeBucket;
+    // the region to write offloaded ledgers
+    private final String writeRegion;
+    // the endpoint
+    private final String writeEndpoint;
+    // credentials
+    private final Credentials credentials;
+
     // max block size for each data block.
     private int maxBlockSize;
     private final int readBufferSize;
 
-    private BlobStoreContext context;
-    private BlobStore blobStore;
-    Location location = null;
+    private final BlobStore writeBlobStore;
+    private final Location writeLocation;
+
+    private final ConcurrentMap<BlobStoreLocation, BlobStore> readBlobStores = 
new ConcurrentHashMap<>();
+
+    // metadata to be stored as part of the offloaded ledger metadata
     private final Map<String, String> userMetadata;
+    // offload driver metadata to be stored as part of the original ledger 
metadata
+    private final String offloadDriverName;
 
     @VisibleForTesting
     static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfigurationData conf,
@@ -124,6 +180,9 @@ public static BlobStoreManagedLedgerOffloader 
create(TieredStorageConfigurationD
                                                          OrderedScheduler 
scheduler)
             throws IOException {
         String driver = conf.getManagedLedgerOffloadDriver();
+        if ("s3".equals(driver.toLowerCase())) {
+            driver = "aws-s3";
+        }
         if (!driverSupported(driver)) {
             throw new IOException(
                 "Not support this kind of driver as offload backend: " + 
driver);
@@ -217,35 +276,34 @@ public static Credentials getCredentials(String driver, 
TieredStorageConfigurati
                                     int maxBlockSize, int readBufferSize,
                                     String endpoint, String region, 
Credentials credentials,
                                     Map<String, String> userMetadata) {
+        this.offloadDriverName = driver;
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
-        this.bucket = container;
+        this.writeBucket = container;
+        this.writeRegion = region;
+        this.writeEndpoint = endpoint;
         this.maxBlockSize = maxBlockSize;
         this.userMetadata = userMetadata;
+        this.credentials = credentials;
 
-        Properties overrides = new Properties();
-        // This property controls the number of parts being uploaded in 
parallel.
-        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
-        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
-        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
-        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
-
-        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
-        contextBuilder.credentials(credentials.identity, 
credentials.credential);
-
-        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
-            contextBuilder.endpoint(endpoint);
-            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
-        }
         if (!Strings.isNullOrEmpty(region)) {
-            this.location = new 
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
+            this.writeLocation = new LocationBuilder()
+                .scope(LocationScope.REGION)
+                .id(region)
+                .description(region)
+                .build();
+        } else {
+            this.writeLocation = null;
         }
 
-        log.info("Constructor driver: {}, host: {}, container: {}, region: {} 
",  driver, endpoint, bucket, region);
+        log.info("Constructor offload driver: {}, host: {}, container: {}, 
region: {} ",
+            driver, endpoint, container, region);
 
-        contextBuilder.overrides(overrides);
-        this.context = contextBuilder.buildView(BlobStoreContext.class);
-        this.blobStore = context.getBlobStore();
+        Pair<BlobStoreLocation, BlobStore> blobStore = createBlobStore(
+            driver, region, endpoint, credentials, maxBlockSize
+        );
+        this.writeBlobStore = blobStore.getRight();
+        this.readBlobStores.put(blobStore.getLeft(), blobStore.getRight());
     }
 
     // build context for jclouds BlobStoreContext, mostly used in test
@@ -258,12 +316,22 @@ public static Credentials getCredentials(String driver, 
TieredStorageConfigurati
     BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
                                     int maxBlockSize, int readBufferSize,
                                     Map<String, String> userMetadata) {
+        this.offloadDriverName = "aws-s3";
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
-        this.bucket = container;
+        this.writeBucket = container;
+        this.writeRegion = null;
+        this.writeEndpoint = null;
         this.maxBlockSize = maxBlockSize;
-        this.blobStore = blobStore;
+        this.writeBlobStore = blobStore;
+        this.writeLocation = null;
         this.userMetadata = userMetadata;
+        this.credentials = null;
+
+        readBlobStores.put(
+            BlobStoreLocation.of(writeRegion, writeEndpoint),
+            blobStore
+        );
     }
 
     static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -274,12 +342,26 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
         return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
     }
 
-    public boolean createBucket() {
-        return blobStore.createContainerInLocation(location, bucket);
+    public boolean createBucket(String bucket) {
+        return writeBlobStore.createContainerInLocation(writeLocation, bucket);
+    }
+
+    public void deleteBucket(String bucket) {
+        writeBlobStore.deleteContainer(bucket);
+    }
+
+    @Override
+    public String getOffloadDriverName() {
+        return offloadDriverName;
     }
 
-    public void deleteBucket() {
-        blobStore.deleteContainer(bucket);
+    @Override
+    public Map<String, String> getOffloadDriverMetadata() {
+        return ImmutableMap.of(
+            METADATA_FIELD_BUCKET, writeBucket,
+            METADATA_FIELD_REGION, writeRegion,
+            METADATA_FIELD_ENDPOINT, writeEndpoint
+        );
     }
 
     // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
@@ -305,10 +387,10 @@ public void deleteBucket() {
 
             // init multi part upload for data block.
             try {
-                BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+                BlobBuilder blobBuilder = 
writeBlobStore.blobBuilder(dataBlockKey);
                 addVersionInfo(blobBuilder, userMetadata);
                 Blob blob = blobBuilder.build();
-                mpu = blobStore.initiateMultipartUpload(bucket, 
blob.getMetadata(), new PutOptions());
+                mpu = writeBlobStore.initiateMultipartUpload(writeBucket, 
blob.getMetadata(), new PutOptions());
             } catch (Throwable t) {
                 promise.completeExceptionally(t);
                 return;
@@ -330,9 +412,9 @@ public void deleteBucket() {
                         Payload partPayload = 
Payloads.newInputStreamPayload(blockStream);
                         
partPayload.getContentMetadata().setContentLength((long)blockSize);
                         
partPayload.getContentMetadata().setContentType("application/octet-stream");
-                        parts.add(blobStore.uploadMultipartPart(mpu, partId, 
partPayload));
+                        parts.add(writeBlobStore.uploadMultipartPart(mpu, 
partId, partPayload));
                         log.debug("UploadMultipartPart. container: {}, 
blobName: {}, partId: {}, mpu: {}",
-                            bucket, dataBlockKey, partId, mpu.id());
+                            writeBucket, dataBlockKey, partId, mpu.id());
 
                         indexBuilder.addBlock(startEntry, partId, blockSize);
 
@@ -349,16 +431,16 @@ public void deleteBucket() {
                     dataObjectLength += blockSize;
                 }
 
-                blobStore.completeMultipartUpload(mpu, parts);
+                writeBlobStore.completeMultipartUpload(mpu, parts);
                 mpu = null;
             } catch (Throwable t) {
                 try {
                     if (mpu != null) {
-                        blobStore.abortMultipartUpload(mpu);
+                        writeBlobStore.abortMultipartUpload(mpu);
                     }
                 } catch (Throwable throwable) {
                     log.error("Failed abortMultipartUpload in bucket - {} with 
key - {}, uploadId - {}.",
-                        bucket, dataBlockKey, mpu.id(), throwable);
+                        writeBucket, dataBlockKey, mpu.id(), throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -368,7 +450,7 @@ public void deleteBucket() {
             try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
                  OffloadIndexBlock.IndexInputStream indexStream = 
index.toStream()) {
                 // write the index block
-                BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+                BlobBuilder blobBuilder = 
writeBlobStore.blobBuilder(indexBlockKey);
                 addVersionInfo(blobBuilder, userMetadata);
                 Payload indexPayload = 
Payloads.newInputStreamPayload(indexStream);
                 
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
@@ -379,14 +461,14 @@ public void deleteBucket() {
                     .contentLength((long)indexStream.getStreamSize())
                     .build();
 
-                blobStore.putBlob(bucket, blob);
+                writeBlobStore.putBlob(writeBucket, blob);
                 promise.complete(null);
             } catch (Throwable t) {
                 try {
-                    blobStore.removeBlob(bucket, dataBlockKey);
+                    writeBlobStore.removeBlob(writeBucket, dataBlockKey);
                 } catch (Throwable throwable) {
                     log.error("Failed deleteObject in bucket - {} with key - 
{}.",
-                        bucket, dataBlockKey, throwable);
+                        writeBucket, dataBlockKey, throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -395,16 +477,57 @@ public void deleteBucket() {
         return promise;
     }
 
+    String getReadRegion(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_REGION, 
writeRegion);
+    }
+
+    String getReadBucket(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_BUCKET, 
writeBucket);
+    }
+
+    String getReadEndpoint(Map<String, String> offloadDriverMetadata) {
+        return offloadDriverMetadata.getOrDefault(METADATA_FIELD_ENDPOINT, 
writeEndpoint);
+    }
+
+    BlobStore getReadBlobStore(Map<String, String> offloadDriverMetadata) {
+        BlobStoreLocation location = BlobStoreLocation.of(
+            getReadRegion(offloadDriverMetadata),
+            getReadEndpoint(offloadDriverMetadata)
+        );
+        BlobStore blobStore = readBlobStores.get(location);
+        if (null == blobStore) {
+            blobStore = createBlobStore(
+                offloadDriverName,
+                location.getRegion(),
+                location.getEndpoint(),
+                credentials,
+                maxBlockSize
+            ).getRight();
+            BlobStore existingBlobStore = readBlobStores.putIfAbsent(location, 
blobStore);
+            if (null == existingBlobStore) {
+                return blobStore;
+            } else {
+                return existingBlobStore;
+            }
+        } else {
+            return blobStore;
+        }
+    }
+
     @Override
-    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uid) {
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
+        String readBucket = getReadBucket(offloadDriverMetadata);
+        BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = dataBlockOffloadKey(ledgerId, uid);
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
                     
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                 blobStore,
-                                                                 bucket, key, 
indexKey,
+                                                                 readBlobstore,
+                                                                 readBucket, 
key, indexKey,
                                                                  VERSION_CHECK,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
@@ -418,11 +541,15 @@ public void deleteBucket() {
 
 
     @Override
-    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+                                                   Map<String, String> 
offloadDriverMetadata) {
+        String readBucket = getReadBucket(offloadDriverMetadata);
+        BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-                blobStore.removeBlobs(bucket,
+                readBlobstore.removeBlobs(readBucket,
                     ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index fca1ef2b4e..eb88d37411 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -30,6 +30,7 @@
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -252,7 +253,7 @@ public void testOffloadAndRead() throws Exception {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         try (LedgerEntries toWriteEntries = toWrite.read(0, 
toWrite.getLastAddConfirmed());
@@ -406,7 +407,7 @@ public void testOffloadReadRandomAccess() throws Exception {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         for (long[] access : randomAccesses) {
@@ -438,7 +439,7 @@ public void testOffloadReadInvalidEntryIds() throws 
Exception {
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
-        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), 
uuid).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertEquals(toTest.getLastAddConfirmed(), 
toWrite.getLastAddConfirmed());
 
         try {
@@ -467,7 +468,7 @@ public void testDeleteOffloaded() throws Exception {
         Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
 
         // verify object deleted after delete
-        offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        offloader.deleteOffloaded(readHandle.getId(), uuid, 
Collections.emptyMap()).get();
         Assert.assertFalse(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
         Assert.assertFalse(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
     }
@@ -492,7 +493,7 @@ public void testDeleteOffloadedFail() throws Exception {
             Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
             Assert.assertTrue(blobStore.blobExists(BUCKET, 
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), 
uuid)));
 
-            offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+            offloader.deleteOffloaded(readHandle.getId(), uuid, 
Collections.emptyMap()).get();
         } catch (Exception e) {
             // expected
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
@@ -542,7 +543,7 @@ public void testReadUnknownDataVersion() throws Exception {
         
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
 String.valueOf(-12345));
         blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
-        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid, Collections.emptyMap()).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
@@ -554,7 +555,7 @@ public void testReadUnknownDataVersion() throws Exception {
         
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
 String.valueOf(12345));
         blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
-        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid, Collections.emptyMap()).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
@@ -581,7 +582,7 @@ public void testReadUnknownIndexVersion() throws Exception {
         blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
-            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
             Assert.fail("Shouldn't have been able to open");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
@@ -592,7 +593,7 @@ public void testReadUnknownIndexVersion() throws Exception {
         blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
-            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            offloader.readOffloaded(toWrite.getId(), uuid, 
Collections.emptyMap()).get();
             Assert.fail("Shouldn't have been able to open");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to