sijie closed pull request #2398: [tiered storage] store driver name and driver specific metadata in original ledger metadata URL: https://github.com/apache/incubator-pulsar/pull/2398
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java index 6885500bb8..8fc35cc0b6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java @@ -20,6 +20,7 @@ import com.google.common.annotations.Beta; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -31,6 +32,25 @@ */ @Beta public interface LedgerOffloader { + + /** + * Get offload driver name. + * + * @return offload driver name. + */ + String getOffloadDriverName(); + + /** + * Get offload driver metadata. + * + * <p>The driver metadata will be recorded as part of the metadata of the original ledger. + * + * @return offload driver metadata. + */ + default Map<String, String> getOffloadDriverMetadata() { + return Collections.emptyMap(); + } + /** * Offload the passed in ledger to longterm storage. * Metadata passed in is for inspection purposes only and should be stored @@ -51,10 +71,9 @@ * * @param ledger the ledger to offload * @param uid unique id to identity this offload attempt - * @param extraMetadata metadata to be stored with the ledger for informational + * @param extraMetadata metadata to be stored with the offloaded ledger for informational * purposes - * @return a future, which when completed, denotes that the offload has been - * successful + * @return a future, which when completed, denotes that the offload has been successful. */ CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, @@ -69,9 +88,11 @@ * * @param ledgerId the ID of the ledger to load from longterm storage * @param uid unique ID for previous successful offload attempt + * @param offloadDriverMetadata offload driver metadata * @return a future, which when completed, returns a ReadHandle */ - CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid); + CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata); /** * Delete a ledger from long term storage. @@ -81,9 +102,11 @@ * * @param ledgerId the ID of the ledger to delete from longterm storage * @param uid unique ID for previous offload attempt + * @param offloadDriverMetadata offload driver metadata * @return a future, which when completed, signifies that the ledger has * been deleted */ - CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid); + CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 388cdef84c..6e4a6090c5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -96,6 +96,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; @@ -1390,7 +1391,9 @@ void asyncReadEntries(OpReadEntry opReadEntry) { if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) { UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid); + // TODO: improve this to load ledger offloader by driver name recorded in metadata + openFuture = config.getLedgerOffloader() + .readOffloaded(ledgerId, uid, OffloadUtils.getOffloadDriverMetadata(info)); } else { openFuture = bookKeeper.newOpenLedgerOp() .withRecovery(!isReadOnly()) @@ -1771,7 +1774,16 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) { } for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); - newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); + newInfoBuilder.getOffloadContextBuilder() + .setBookkeeperDeleted(true); + String driverName = OffloadUtils.getOffloadDriverName( + ls, config.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata( + ls, config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata( + newInfoBuilder, + driverName, driverMetadata + ); ledgers.put(ls.getLedgerId(), newInfoBuilder.build()); } @@ -1903,7 +1915,11 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - cleanupOffloaded(ledgerId, uuid, "Trimming"); + cleanupOffloaded( + ledgerId, uuid, + OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), + "Trimming"); } } @@ -2105,7 +2121,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn UUID uuid = UUID.randomUUID(); Map<String, String> extraMetadata = ImmutableMap.of("ManagedLedgerName", name); - prepareLedgerInfoForOffloaded(ledgerId, uuid) + String driverName = config.getLedgerOffloader().getOffloadDriverName(); + Map<String, String> driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata(); + + prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata) .thenCompose((ignore) -> getLedgerHandle(ledgerId)) .thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata)) .thenCompose((ignore) -> { @@ -2116,7 +2135,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn scheduledExecutor, name) .whenComplete((ignore2, exception) -> { if (exception != null) { - cleanupOffloaded(ledgerId, uuid, "Metastore failure"); + cleanupOffloaded( + ledgerId, uuid, + driverName, driverMetadata, + "Metastore failure"); } }); }) @@ -2216,7 +2238,10 @@ public void operationFailed(MetaStoreException e) { } } - private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid) { + private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, + UUID uuid, + String offloadDriverName, + Map<String, String> offloadDriverMetadata) { log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid); return transformLedgerInfo(ledgerId, (oldInfo) -> { @@ -2225,12 +2250,24 @@ public void operationFailed(MetaStoreException e) { oldInfo.getOffloadContext().getUidLsb()); log.info("[{}] Found previous offload attempt for ledger {}, uuid {}" + ", cleaning up", name, ledgerId, uuid); - cleanupOffloaded(ledgerId, oldUuid, "Previous failed offload"); + cleanupOffloaded( + ledgerId, + oldUuid, + OffloadUtils.getOffloadDriverName(oldInfo, + config.getLedgerOffloader().getOffloadDriverName()), + OffloadUtils.getOffloadDriverMetadata(oldInfo, + config.getLedgerOffloader().getOffloadDriverMetadata()), + "Previous failed offload"); } LedgerInfo.Builder builder = oldInfo.toBuilder(); builder.getOffloadContextBuilder() .setUidMsb(uuid.getMostSignificantBits()) .setUidLsb(uuid.getLeastSignificantBits()); + OffloadUtils.setOffloadDriverMetadata( + builder, + offloadDriverName, + offloadDriverMetadata + ); return builder.build(); }) .whenComplete((result, exception) -> { @@ -2254,6 +2291,16 @@ public void operationFailed(MetaStoreException e) { builder.getOffloadContextBuilder() .setTimestamp(clock.millis()) .setComplete(true); + + String driverName = OffloadUtils.getOffloadDriverName( + oldInfo, config.getLedgerOffloader().getOffloadDriverName()); + Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata( + oldInfo, config.getLedgerOffloader().getOffloadDriverMetadata()); + OffloadUtils.setOffloadDriverMetadata( + builder, + driverName, + driverMetadata + ); return builder.build(); } else { throw new OffloadConflict( @@ -2272,10 +2319,14 @@ public void operationFailed(MetaStoreException e) { }); } - private void cleanupOffloaded(long ledgerId, UUID uuid, String cleanupReason) { + private void cleanupOffloaded(long ledgerId, + UUID uuid, + String offloadDriverName, /* TODO: use driver name to identify offloader */ + Map<String, String> offloadDriverMetadata, + String cleanupReason) { Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid), + () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata), scheduledExecutor, name) .whenComplete((ignored, exception) -> { if (exception != null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java index cd76a1b374..3401f1bbbb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java @@ -31,6 +31,11 @@ public class NullLedgerOffloader implements LedgerOffloader { public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader(); + @Override + public String getOffloadDriverName() { + return "NullLedgerOffloader"; + } + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uid, @@ -41,14 +46,16 @@ } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java new file mode 100644 index 0000000000..44ebc80320 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata; + +public final class OffloadUtils { + + private OffloadUtils() {} + + public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo) { + Map<String, String> metadata = Maps.newHashMap(); + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.getPropertiesCount() > 0) { + driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue())); + } + } + } + return metadata; + } + + public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo, + Map<String, String> defaultOffloadDriverMetadata) { + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.getPropertiesCount() > 0) { + Map<String, String> metadata = Maps.newHashMap(); + driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue())); + return metadata; + } + } + } + return defaultOffloadDriverMetadata; + } + + public static String getOffloadDriverName(LedgerInfo ledgerInfo, String defaultDriverName) { + if (ledgerInfo.hasOffloadContext()) { + OffloadContext ctx = ledgerInfo.getOffloadContext(); + if (ctx.hasDriverMetadata()) { + OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata(); + if (driverMetadata.hasName()) { + return driverMetadata.getName(); + } + } + } + return defaultDriverName; + } + + public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder, + String driverName, + Map<String, String> offloadDriverMetadata) { + infoBuilder.getOffloadContextBuilder() + .getDriverMetadataBuilder() + .setName(driverName); + offloadDriverMetadata.forEach((k, v) -> { + infoBuilder.getOffloadContextBuilder() + .getDriverMetadataBuilder() + .addProperties(KeyValue.newBuilder() + .setKey(k) + .setValue(v) + .build()); + }); + } + +} diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 0d5ad3a17c..4dbd23154d 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -21,12 +21,23 @@ syntax = "proto2"; option java_package = "org.apache.bookkeeper.mledger.proto"; option optimize_for = SPEED; +message KeyValue { + required string key = 1; + required string value = 2; +} + +message OffloadDriverMetadata { + required string name = 1; + repeated KeyValue properties = 2; +} + message OffloadContext { optional int64 uidMsb = 1; optional int64 uidLsb = 2; optional bool complete = 3; optional bool bookkeeperDeleted = 4; optional int64 timestamp = 5; + optional OffloadDriverMetadata driverMetadata = 6; } message ManagedLedgerInfo { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 8bbb44ae4b..4bf518f3a1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; @@ -30,6 +31,7 @@ import io.netty.buffer.ByteBuf; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -51,7 +53,6 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -98,24 +99,32 @@ public void testOffloadRead() throws Exception { for (Entry e : cursor.readEntries(10)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(1)).readOffloaded(anyLong(), anyObject()); - verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID)); + verify(offloader, times(1)) + .readOffloaded(anyLong(), anyObject(), anyMap()); + verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(10)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); - verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID)); + verify(offloader, times(2)) + .readOffloaded(anyLong(), anyObject(), anyMap()); + verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(5)) { Assert.assertEquals(new String(e.getData()), "entry-" + i++); } - verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); + verify(offloader, times(2)) + .readOffloaded(anyLong(), anyObject(), anyMap()); } static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>(); + @Override + public String getOffloadDriverName() { + return "mock"; + } + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, @@ -131,12 +140,14 @@ public void testOffloadRead() throws Exception { } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { return CompletableFuture.completedFuture(offloads.get(uuid)); } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { offloads.remove(uuid); return CompletableFuture.completedFuture(null); }; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 6d21ee2a7f..351d86b6c3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -493,9 +493,10 @@ public void testOffloadConflict() throws Exception { } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { deleted.add(Pair.of(ledgerId, uuid)); - return super.deleteOffloaded(ledgerId, uuid); + return super.deleteOffloaded(ledgerId, uuid, offloadDriverMetadata); } }; ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -928,6 +929,11 @@ public void offloadFailed(ManagedLedgerException exception, Object ctx) { return deletes.keySet(); } + @Override + public String getOffloadDriverName() { + return "mock"; + } + @Override public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, @@ -942,14 +948,16 @@ public void offloadFailed(ManagedLedgerException exception, Object ctx) { } @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); promise.completeExceptionally(new UnsupportedOperationException()); return promise; } @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, + Map<String, String> offloadDriverMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); if (offloads.remove(ledgerId, uuid)) { deletes.put(ledgerId, uuid); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index dc925a2756..63b4d84c1b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.Sets; @@ -78,6 +79,8 @@ public void cleanup() throws Exception { private void testOffload(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); + when(offloader.getOffloadDriverName()).thenReturn("mock"); + doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); CompletableFuture<Void> promise = new CompletableFuture<>(); diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml index eb4636f634..fcd33002e0 100644 --- a/tiered-storage/jcloud/pom.xml +++ b/tiered-storage/jcloud/pom.xml @@ -42,6 +42,7 @@ <groupId>org.apache.pulsar</groupId> <artifactId>jclouds-shaded</artifactId> <version>${project.version}</version> + <!-- <exclusions> <exclusion> <groupId>com.google.code.gson</groupId> @@ -68,6 +69,7 @@ <artifactId>*</artifactId> </exclusion> </exclusions> + --> </dependency> <dependency> <groupId>com.amazonaws</groupId> diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index e09a66b368..f96afaf363 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -36,6 +36,9 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import lombok.Data; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -43,6 +46,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; +import org.apache.commons.lang3.tuple.Pair; import org.jclouds.Constants; import org.jclouds.ContextBuilder; import org.jclouds.blobstore.BlobStore; @@ -66,6 +70,10 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class); + private static final String METADATA_FIELD_BUCKET = "bucket"; + private static final String METADATA_FIELD_REGION = "region"; + private static final String METADATA_FIELD_ENDPOINT = "endpoint"; + public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; // use these keys for both s3 and gcs. @@ -91,6 +99,42 @@ private static void addVersionInfo(BlobBuilder blobBuilder, Map<String, String> blobBuilder.userMetadata(metadataBuilder.build()); } + @Data(staticConstructor = "of") + private static class BlobStoreLocation { + private final String region; + private final String endpoint; + } + + private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String driver, + String region, + String endpoint, + Credentials credentials, + int maxBlockSize) { + Properties overrides = new Properties(); + // This property controls the number of parts being uploaded in parallel. + overrides.setProperty("jclouds.mpu.parallel.degree", "1"); + overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); + overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); + overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); + + ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); + contextBuilder.credentials(credentials.identity, credentials.credential); + + if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { + contextBuilder.endpoint(endpoint); + overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); + } + contextBuilder.overrides(overrides); + BlobStoreContext context = contextBuilder.buildView(BlobStoreContext.class); + BlobStore blobStore = context.getBlobStore(); + + log.info("Connect to blobstore : driver: {}, region: {}, endpoint: {}", + driver, region, endpoint); + return Pair.of( + BlobStoreLocation.of(region, endpoint), + blobStore); + } + private final VersionCheck VERSION_CHECK = (key, blob) -> { // NOTE all metadata in jclouds comes out as lowercase, in an effort to normalize the providers String version = blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase()); @@ -102,16 +146,28 @@ private static void addVersionInfo(BlobBuilder blobBuilder, Map<String, String> private final OrderedScheduler scheduler; - // container in jclouds - private final String bucket; + // container in jclouds to write offloaded ledgers + private final String writeBucket; + // the region to write offloaded ledgers + private final String writeRegion; + // the endpoint + private final String writeEndpoint; + // credentials + private final Credentials credentials; + // max block size for each data block. private int maxBlockSize; private final int readBufferSize; - private BlobStoreContext context; - private BlobStore blobStore; - Location location = null; + private final BlobStore writeBlobStore; + private final Location writeLocation; + + private final ConcurrentMap<BlobStoreLocation, BlobStore> readBlobStores = new ConcurrentHashMap<>(); + + // metadata to be stored as part of the offloaded ledger metadata private final Map<String, String> userMetadata; + // offload driver metadata to be stored as part of the original ledger metadata + private final String offloadDriverName; @VisibleForTesting static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationData conf, @@ -124,6 +180,9 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfigurationD OrderedScheduler scheduler) throws IOException { String driver = conf.getManagedLedgerOffloadDriver(); + if ("s3".equals(driver.toLowerCase())) { + driver = "aws-s3"; + } if (!driverSupported(driver)) { throw new IOException( "Not support this kind of driver as offload backend: " + driver); @@ -217,35 +276,34 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials, Map<String, String> userMetadata) { + this.offloadDriverName = driver; this.scheduler = scheduler; this.readBufferSize = readBufferSize; - this.bucket = container; + this.writeBucket = container; + this.writeRegion = region; + this.writeEndpoint = endpoint; this.maxBlockSize = maxBlockSize; this.userMetadata = userMetadata; + this.credentials = credentials; - Properties overrides = new Properties(); - // This property controls the number of parts being uploaded in parallel. - overrides.setProperty("jclouds.mpu.parallel.degree", "1"); - overrides.setProperty("jclouds.mpu.parts.size", Integer.toString(maxBlockSize)); - overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); - overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); - - ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); - contextBuilder.credentials(credentials.identity, credentials.credential); - - if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { - contextBuilder.endpoint(endpoint); - overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); - } if (!Strings.isNullOrEmpty(region)) { - this.location = new LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build(); + this.writeLocation = new LocationBuilder() + .scope(LocationScope.REGION) + .id(region) + .description(region) + .build(); + } else { + this.writeLocation = null; } - log.info("Constructor driver: {}, host: {}, container: {}, region: {} ", driver, endpoint, bucket, region); + log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ", + driver, endpoint, container, region); - contextBuilder.overrides(overrides); - this.context = contextBuilder.buildView(BlobStoreContext.class); - this.blobStore = context.getBlobStore(); + Pair<BlobStoreLocation, BlobStore> blobStore = createBlobStore( + driver, region, endpoint, credentials, maxBlockSize + ); + this.writeBlobStore = blobStore.getRight(); + this.readBlobStores.put(blobStore.getLeft(), blobStore.getRight()); } // build context for jclouds BlobStoreContext, mostly used in test @@ -258,12 +316,22 @@ public static Credentials getCredentials(String driver, TieredStorageConfigurati BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler, int maxBlockSize, int readBufferSize, Map<String, String> userMetadata) { + this.offloadDriverName = "aws-s3"; this.scheduler = scheduler; this.readBufferSize = readBufferSize; - this.bucket = container; + this.writeBucket = container; + this.writeRegion = null; + this.writeEndpoint = null; this.maxBlockSize = maxBlockSize; - this.blobStore = blobStore; + this.writeBlobStore = blobStore; + this.writeLocation = null; this.userMetadata = userMetadata; + this.credentials = null; + + readBlobStores.put( + BlobStoreLocation.of(writeRegion, writeEndpoint), + blobStore + ); } static String dataBlockOffloadKey(long ledgerId, UUID uuid) { @@ -274,12 +342,26 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId); } - public boolean createBucket() { - return blobStore.createContainerInLocation(location, bucket); + public boolean createBucket(String bucket) { + return writeBlobStore.createContainerInLocation(writeLocation, bucket); + } + + public void deleteBucket(String bucket) { + writeBlobStore.deleteContainer(bucket); + } + + @Override + public String getOffloadDriverName() { + return offloadDriverName; } - public void deleteBucket() { - blobStore.deleteContainer(bucket); + @Override + public Map<String, String> getOffloadDriverMetadata() { + return ImmutableMap.of( + METADATA_FIELD_BUCKET, writeBucket, + METADATA_FIELD_REGION, writeRegion, + METADATA_FIELD_ENDPOINT, writeEndpoint + ); } // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, @@ -305,10 +387,10 @@ public void deleteBucket() { // init multi part upload for data block. try { - BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey); + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey); addVersionInfo(blobBuilder, userMetadata); Blob blob = blobBuilder.build(); - mpu = blobStore.initiateMultipartUpload(bucket, blob.getMetadata(), new PutOptions()); + mpu = writeBlobStore.initiateMultipartUpload(writeBucket, blob.getMetadata(), new PutOptions()); } catch (Throwable t) { promise.completeExceptionally(t); return; @@ -330,9 +412,9 @@ public void deleteBucket() { Payload partPayload = Payloads.newInputStreamPayload(blockStream); partPayload.getContentMetadata().setContentLength((long)blockSize); partPayload.getContentMetadata().setContentType("application/octet-stream"); - parts.add(blobStore.uploadMultipartPart(mpu, partId, partPayload)); + parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, partPayload)); log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", - bucket, dataBlockKey, partId, mpu.id()); + writeBucket, dataBlockKey, partId, mpu.id()); indexBuilder.addBlock(startEntry, partId, blockSize); @@ -349,16 +431,16 @@ public void deleteBucket() { dataObjectLength += blockSize; } - blobStore.completeMultipartUpload(mpu, parts); + writeBlobStore.completeMultipartUpload(mpu, parts); mpu = null; } catch (Throwable t) { try { if (mpu != null) { - blobStore.abortMultipartUpload(mpu); + writeBlobStore.abortMultipartUpload(mpu); } } catch (Throwable throwable) { log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", - bucket, dataBlockKey, mpu.id(), throwable); + writeBucket, dataBlockKey, mpu.id(), throwable); } promise.completeExceptionally(t); return; @@ -368,7 +450,7 @@ public void deleteBucket() { try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) { // write the index block - BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey); + BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey); addVersionInfo(blobBuilder, userMetadata); Payload indexPayload = Payloads.newInputStreamPayload(indexStream); indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize()); @@ -379,14 +461,14 @@ public void deleteBucket() { .contentLength((long)indexStream.getStreamSize()) .build(); - blobStore.putBlob(bucket, blob); + writeBlobStore.putBlob(writeBucket, blob); promise.complete(null); } catch (Throwable t) { try { - blobStore.removeBlob(bucket, dataBlockKey); + writeBlobStore.removeBlob(writeBucket, dataBlockKey); } catch (Throwable throwable) { log.error("Failed deleteObject in bucket - {} with key - {}.", - bucket, dataBlockKey, throwable); + writeBucket, dataBlockKey, throwable); } promise.completeExceptionally(t); return; @@ -395,16 +477,57 @@ public void deleteBucket() { return promise; } + String getReadRegion(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_REGION, writeRegion); + } + + String getReadBucket(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_BUCKET, writeBucket); + } + + String getReadEndpoint(Map<String, String> offloadDriverMetadata) { + return offloadDriverMetadata.getOrDefault(METADATA_FIELD_ENDPOINT, writeEndpoint); + } + + BlobStore getReadBlobStore(Map<String, String> offloadDriverMetadata) { + BlobStoreLocation location = BlobStoreLocation.of( + getReadRegion(offloadDriverMetadata), + getReadEndpoint(offloadDriverMetadata) + ); + BlobStore blobStore = readBlobStores.get(location); + if (null == blobStore) { + blobStore = createBlobStore( + offloadDriverName, + location.getRegion(), + location.getEndpoint(), + credentials, + maxBlockSize + ).getRight(); + BlobStore existingBlobStore = readBlobStores.putIfAbsent(location, blobStore); + if (null == existingBlobStore) { + return blobStore; + } else { + return existingBlobStore; + } + } else { + return blobStore; + } + } + @Override - public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { + String readBucket = getReadBucket(offloadDriverMetadata); + BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata); + CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); String key = dataBlockOffloadKey(ledgerId, uid); String indexKey = indexBlockOffloadKey(ledgerId, uid); scheduler.chooseThread(ledgerId).submit(() -> { try { promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), - blobStore, - bucket, key, indexKey, + readBlobstore, + readBucket, key, indexKey, VERSION_CHECK, ledgerId, readBufferSize)); } catch (Throwable t) { @@ -418,11 +541,15 @@ public void deleteBucket() { @Override - public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, + Map<String, String> offloadDriverMetadata) { + String readBucket = getReadBucket(offloadDriverMetadata); + BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata); + CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.chooseThread(ledgerId).submit(() -> { try { - blobStore.removeBlobs(bucket, + readBlobstore.removeBlobs(readBucket, ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); promise.complete(null); } catch (Throwable t) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index fca1ef2b4e..eb88d37411 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -252,7 +253,7 @@ public void testOffloadAndRead() throws Exception { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); try (LedgerEntries toWriteEntries = toWrite.read(0, toWrite.getLastAddConfirmed()); @@ -406,7 +407,7 @@ public void testOffloadReadRandomAccess() throws Exception { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); for (long[] access : randomAccesses) { @@ -438,7 +439,7 @@ public void testOffloadReadInvalidEntryIds() throws Exception { UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, new HashMap<>()).get(); - ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid).get(); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); try { @@ -467,7 +468,7 @@ public void testDeleteOffloaded() throws Exception { Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); // verify object deleted after delete - offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + offloader.deleteOffloaded(readHandle.getId(), uuid, Collections.emptyMap()).get(); Assert.assertFalse(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid))); Assert.assertFalse(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); } @@ -492,7 +493,7 @@ public void testDeleteOffloadedFail() throws Exception { Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid))); Assert.assertTrue(blobStore.blobExists(BUCKET, BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(), uuid))); - offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + offloader.deleteOffloaded(readHandle.getId(), uuid, Collections.emptyMap()).get(); } catch (Exception e) { // expected Assert.assertTrue(e.getCause().getMessage().contains(failureString)); @@ -542,7 +543,7 @@ public void testReadUnknownDataVersion() throws Exception { userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(), String.valueOf(-12345)); blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); - try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { @@ -554,7 +555,7 @@ public void testReadUnknownDataVersion() throws Exception { userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(), String.valueOf(12345)); blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, CopyOptions.builder().userMetadata(userMeta).build()); - try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get()) { toRead.readAsync(0, 0).get(); Assert.fail("Shouldn't have been able to read"); } catch (ExecutionException e) { @@ -581,7 +582,7 @@ public void testReadUnknownIndexVersion() throws Exception { blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { - offloader.readOffloaded(toWrite.getId(), uuid).get(); + offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.fail("Shouldn't have been able to open"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class); @@ -592,7 +593,7 @@ public void testReadUnknownIndexVersion() throws Exception { blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, CopyOptions.builder().userMetadata(userMeta).build()); try { - offloader.readOffloaded(toWrite.getId(), uuid).get(); + offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); Assert.fail("Shouldn't have been able to open"); } catch (ExecutionException e) { Assert.assertEquals(e.getCause().getClass(), IOException.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services