This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5c2b0aa Trigger offload when managed ledger reaches a size threshold (#1960) 5c2b0aa is described below commit 5c2b0aa14f7c3117f70267b5c04e07901ecc2838 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed Jun 13 21:47:08 2018 +0200 Trigger offload when managed ledger reaches a size threshold (#1960) When a managed ledger reaches a certain size, start offloading ledgers in the background. Master Issue: #1511 --- .../bookkeeper/mledger/ManagedLedgerConfig.java | 22 +++ .../bookkeeper/mledger/ManagedLedgerException.java | 6 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 92 ++++++++-- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 189 +++++++++++++++++++++ 4 files changed, 298 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index fc5499b..13d259c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -55,6 +55,7 @@ public class ManagedLedgerConfig { private long retentionSizeInMB = 0; private boolean autoSkipNonRecoverableData; private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); + private long offloadAutoTriggerSizeThresholdBytes = -1; private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -410,6 +411,27 @@ public class ManagedLedgerConfig { } /** + * Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage. + * A negative value disables autotriggering. + * Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}. + * Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold. + * + * @param threshold Threshold in bytes at which offload is automatically triggered + */ + public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long threshold) { + this.offloadAutoTriggerSizeThresholdBytes = threshold; + return this; + } + + /** + * Size, in bytes, at which offloading will automatically be triggered for this managed ledger. + * @return the trigger threshold, in bytes + */ + public long getOffloadAutoTriggerSizeThresholdBytes() { + return this.offloadAutoTriggerSizeThresholdBytes; + } + + /** * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets * corrupted at bookkeeper and managed-cursor is stuck at that ledger. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index c4b401a..41aa45b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -127,6 +127,12 @@ public class ManagedLedgerException extends Exception { } } + public static class OffloadInProgressException extends ManagedLedgerException { + public OffloadInProgressException(String msg) { + super(msg); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 65276b3..3e24eb3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -45,6 +45,7 @@ import java.util.Queue; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; @@ -164,6 +165,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex ledgersListMutex = new CallbackMutex(); private final CallbackMutex trimmerMutex = new CallbackMutex(); + private final CallbackMutex offloadMutex = new CallbackMutex(); + private final static CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE + = CompletableFuture.completedFuture(PositionImpl.latest); private volatile LedgerHandle currentLedger; private long currentLedgerEntries = 0; private long currentLedgerSize = 0; @@ -1305,6 +1309,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { trimConsumedLedgersInBackground(); + maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + if (!pendingAddEntries.isEmpty()) { // Need to create a new ledger to write pending entries if (log.isDebugEnabled()) { @@ -1575,6 +1581,63 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS); } + private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) { + if (config.getOffloadAutoTriggerSizeThresholdBytes() > 0) { + executor.executeOrdered(name, safeRun(() -> maybeOffload(promise))); + } + } + + private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) { + if (!offloadMutex.tryLock()) { + scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)), + 100, TimeUnit.MILLISECONDS); + } else { + CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>(); + unlockingPromise.whenComplete((res, ex) -> { + offloadMutex.unlock(); + if (ex != null) { + finalPromise.completeExceptionally(ex); + } else { + finalPromise.complete(res); + } + }); + + long threshold = config.getOffloadAutoTriggerSizeThresholdBytes(); + long sizeSummed = 0; + long alreadyOffloadedSize = 0; + long toOffloadSize = 0; + + ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque(); + + // go through ledger list from newest to oldest and build a list to offload in oldest to newest order + for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) { + long size = e.getValue().getSize(); + sizeSummed += size; + boolean alreadyOffloaded = e.getValue().hasOffloadContext() + && e.getValue().getOffloadContext().getComplete(); + if (alreadyOffloaded) { + alreadyOffloadedSize += size; + } else if (sizeSummed > threshold) { + toOffloadSize += size; + toOffload.addFirst(e.getValue()); + } + } + + if (toOffload.size() > 0) { + log.info("[{}] Going to automatically offload ledgers {}" + + ", total size = {}, already offloaded = {}, to offload = {}", + name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()), + sizeSummed, alreadyOffloadedSize, toOffloadSize); + } else { + // offloadLoop will complete immediately with an empty list to offload + log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}", + name, sizeSummed, alreadyOffloadedSize, threshold); + } + + offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty()); + } + } + private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { if (config.getRetentionTimeMillis() < 0) { // Negative retention time equates to infinite retention @@ -2002,18 +2065,25 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return; } - log.info("[{}] Going to offload ledgers {}", name, - ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList())); + if (offloadMutex.tryLock()) { + log.info("[{}] Going to offload ledgers {}", name, + ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList())); - CompletableFuture<PositionImpl> promise = new CompletableFuture<>(); - offloadLoop(promise, ledgersToOffload, firstUnoffloaded, Optional.empty()); - promise.whenComplete((result, exception) -> { - if (exception != null) { - callback.offloadFailed(new ManagedLedgerException(exception), ctx); - } else { - callback.offloadComplete(result, ctx); - } - }); + CompletableFuture<PositionImpl> promise = new CompletableFuture<>(); + promise.whenComplete((result, exception) -> { + offloadMutex.unlock(); + if (exception != null) { + callback.offloadFailed(new ManagedLedgerException(exception), ctx); + } else { + callback.offloadComplete(result, ctx); + } + }); + offloadLoop(promise, ledgersToOffload, firstUnoffloaded, Optional.empty()); + } else { + callback.offloadFailed( + new ManagedLedgerException.OffloadInProgressException("Offload operation already running"), + ctx); + } } private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload, diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 76cbbb6..27fe14b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -676,6 +676,195 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId)); } + private static byte[] buildEntry(int size, String pattern) { + byte[] entry = new byte[size]; + byte[] patternBytes = pattern.getBytes(); + + for (int i = 0; i < entry.length; i++) { + entry[i] = patternBytes[i % patternBytes.length]; + } + return entry; + } + + @Test + public void testAutoTriggerOffload() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setOffloadAutoTriggerSizeThresholdBytes(100); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + // Ledger will roll twice, offload will run on first ledger after second closed + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + // offload should eventually be triggered + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1); + Assert.assertEquals(offloader.offloadedLedgers(), + ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId())); + } + + @Test + public void manualTriggerWhileAutoInProgress() throws Exception { + CompletableFuture<Void> slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setOffloadAutoTriggerSizeThresholdBytes(100); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + // Ledger will roll twice, offload will run on first ledger after second closed + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + offloadRunning.await(); + + for (int i = 0; i < 20; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + Position p = ledger.addEntry(buildEntry(10, "last-entry")); + + try { + ledger.offloadPrefix(p); + Assert.fail("Shouldn't have succeeded"); + } catch (ManagedLedgerException.OffloadInProgressException e) { + // expected + } + + slowOffload.complete(null); + + // eventually all over threshold will be offloaded + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3); + Assert.assertEquals(offloader.offloadedLedgers(), + ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(), + ledger.getLedgersInfoAsList().get(1).getLedgerId(), + ledger.getLedgersInfoAsList().get(2).getLedgerId())); + + // then a manual offload can run and offload the one ledger under the threshold + ledger.offloadPrefix(p); + + Assert.assertEquals(offloader.offloadedLedgers().size(), 4); + Assert.assertEquals(offloader.offloadedLedgers(), + ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(), + ledger.getLedgersInfoAsList().get(1).getLedgerId(), + ledger.getLedgersInfoAsList().get(2).getLedgerId(), + ledger.getLedgersInfoAsList().get(3).getLedgerId())); + } + + @Test + public void autoTriggerWhileManualInProgress() throws Exception { + CompletableFuture<Void> slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setOffloadAutoTriggerSizeThresholdBytes(100); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + // Ledger rolls once, threshold not hit so auto shouldn't run + for (int i = 0; i < 14; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + Position p = ledger.addEntry(buildEntry(10, "trigger-entry")); + + OffloadCallbackPromise cbPromise = new OffloadCallbackPromise(); + ledger.asyncOffloadPrefix(p, cbPromise, null); + offloadRunning.await(); + + // add enough entries to roll the ledger a couple of times and trigger some offloads + for (int i = 0; i < 20; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + // allow the manual offload to complete + slowOffload.complete(null); + + Assert.assertEquals(cbPromise.join(), + PositionImpl.get(ledger.getLedgersInfoAsList().get(1).getLedgerId(), 0)); + + // auto trigger should eventually offload everything else over threshold + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + Assert.assertEquals(offloader.offloadedLedgers(), + ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(), + ledger.getLedgersInfoAsList().get(1).getLedgerId())); + } + + @Test + public void multipleAutoTriggers() throws Exception { + CompletableFuture<Void> slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setOffloadAutoTriggerSizeThresholdBytes(100); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + // Ledger will roll twice, offload will run on first ledger after second closed + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + offloadRunning.await(); + + // trigger a bunch more rolls. Eventually there will be 5 ledgers. + // first 3 should be offloaded, 4th is 100bytes, 5th is 0 bytes. + // 4th and 5th sum to 100 bytes so they're just at edge of threshold + for (int i = 0; i < 20; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + + // allow the first offload to continue + slowOffload.complete(null); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3); + Assert.assertEquals(offloader.offloadedLedgers(), + ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(), + ledger.getLedgersInfoAsList().get(1).getLedgerId(), + ledger.getLedgersInfoAsList().get(2).getLedgerId())); + } + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) { -- To stop receiving notification emails like this one, please contact si...@apache.org.