This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c2b0aa  Trigger offload when managed ledger reaches a size threshold 
(#1960)
5c2b0aa is described below

commit 5c2b0aa14f7c3117f70267b5c04e07901ecc2838
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Jun 13 21:47:08 2018 +0200

    Trigger offload when managed ledger reaches a size threshold (#1960)
    
    When a managed ledger reaches a certain size, start offloading ledgers
    in the background.
    
    Master Issue: #1511
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  22 +++
 .../bookkeeper/mledger/ManagedLedgerException.java |   6 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  92 ++++++++--
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 189 +++++++++++++++++++++
 4 files changed, 298 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index fc5499b..13d259c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -55,6 +55,7 @@ public class ManagedLedgerConfig {
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
     private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
+    private long offloadAutoTriggerSizeThresholdBytes = -1;
 
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -410,6 +411,27 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * Size, in bytes, at which the managed ledger will start to automatically 
offload ledgers to longterm storage.
+     * A negative value disables autotriggering.
+     * Offloading will not occur if no offloader has been set {@link 
#setLedgerOffloader(LedgerOffloader)}.
+     * Automatical offloading occurs when the ledger is rolled, and the 
ledgers up to that point exceed the threshold.
+     *
+     * @param threshold Threshold in bytes at which offload is automatically 
triggered
+     */
+    public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long 
threshold) {
+        this.offloadAutoTriggerSizeThresholdBytes = threshold;
+        return this;
+    }
+
+    /**
+     * Size, in bytes, at which offloading will automatically be triggered for 
this managed ledger.
+     * @return the trigger threshold, in bytes
+     */
+    public long getOffloadAutoTriggerSizeThresholdBytes() {
+        return this.offloadAutoTriggerSizeThresholdBytes;
+    }
+
+    /**
      * Skip reading non-recoverable/unreadable data-ledger under 
managed-ledger's list. It helps when data-ledgers gets
      * corrupted at bookkeeper and managed-cursor is stuck at that ledger.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index c4b401a..41aa45b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -127,6 +127,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class OffloadInProgressException extends 
ManagedLedgerException {
+        public OffloadInProgressException(String msg) {
+            super(msg);
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 65276b3..3e24eb3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -45,6 +45,7 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
@@ -164,6 +165,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private final CallbackMutex ledgersListMutex = new CallbackMutex();
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
+    private final CallbackMutex offloadMutex = new CallbackMutex();
+    private final static CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE
+        = CompletableFuture.completedFuture(PositionImpl.latest);
     private volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
     private long currentLedgerSize = 0;
@@ -1305,6 +1309,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         trimConsumedLedgersInBackground();
 
+        maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+
         if (!pendingAddEntries.isEmpty()) {
             // Need to create a new ledger to write pending entries
             if (log.isDebugEnabled()) {
@@ -1575,6 +1581,63 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         scheduledExecutor.schedule(safeRun(() -> 
trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);
     }
 
+    private void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
+        if (config.getOffloadAutoTriggerSizeThresholdBytes() > 0) {
+            executor.executeOrdered(name, safeRun(() -> 
maybeOffload(promise)));
+        }
+    }
+
+    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+        if (!offloadMutex.tryLock()) {
+            scheduledExecutor.schedule(safeRun(() -> 
maybeOffloadInBackground(finalPromise)),
+                                       100, TimeUnit.MILLISECONDS);
+        } else {
+            CompletableFuture<PositionImpl> unlockingPromise = new 
CompletableFuture<>();
+            unlockingPromise.whenComplete((res, ex) -> {
+                    offloadMutex.unlock();
+                    if (ex != null) {
+                        finalPromise.completeExceptionally(ex);
+                    } else {
+                        finalPromise.complete(res);
+                    }
+                });
+
+            long threshold = config.getOffloadAutoTriggerSizeThresholdBytes();
+            long sizeSummed = 0;
+            long alreadyOffloadedSize = 0;
+            long toOffloadSize = 0;
+
+            ConcurrentLinkedDeque<LedgerInfo> toOffload = new 
ConcurrentLinkedDeque();
+
+            // go through ledger list from newest to oldest and build a list 
to offload in oldest to newest order
+            for (Map.Entry<Long, LedgerInfo> e : 
ledgers.descendingMap().entrySet()) {
+                long size = e.getValue().getSize();
+                sizeSummed += size;
+                boolean alreadyOffloaded = e.getValue().hasOffloadContext()
+                    && e.getValue().getOffloadContext().getComplete();
+                if (alreadyOffloaded) {
+                    alreadyOffloadedSize += size;
+                } else if (sizeSummed > threshold) {
+                    toOffloadSize += size;
+                    toOffload.addFirst(e.getValue());
+                }
+            }
+
+            if (toOffload.size() > 0) {
+                log.info("[{}] Going to automatically offload ledgers {}"
+                         + ", total size = {}, already offloaded = {}, to 
offload = {}",
+                         name, toOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()),
+                         sizeSummed, alreadyOffloadedSize, toOffloadSize);
+            } else {
+                // offloadLoop will complete immediately with an empty list to 
offload
+                log.debug("[{}] Nothing to offload, total size = {}, already 
offloaded = {}, threshold = {}",
+                          name, sizeSummed, alreadyOffloadedSize, threshold);
+            }
+
+            offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, 
Optional.empty());
+        }
+    }
+
     private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
         if (config.getRetentionTimeMillis() < 0) {
             // Negative retention time equates to infinite retention
@@ -2002,18 +2065,25 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
             return;
         }
 
-        log.info("[{}] Going to offload ledgers {}", name,
-                 ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
+        if (offloadMutex.tryLock()) {
+            log.info("[{}] Going to offload ledgers {}", name,
+                     ledgersToOffload.stream().map(l -> 
l.getLedgerId()).collect(Collectors.toList()));
 
-        CompletableFuture<PositionImpl> promise = new CompletableFuture<>();
-        offloadLoop(promise, ledgersToOffload, firstUnoffloaded, 
Optional.empty());
-        promise.whenComplete((result, exception) -> {
-                if (exception != null) {
-                    callback.offloadFailed(new 
ManagedLedgerException(exception), ctx);
-                } else {
-                    callback.offloadComplete(result, ctx);
-                }
-            });
+            CompletableFuture<PositionImpl> promise = new 
CompletableFuture<>();
+            promise.whenComplete((result, exception) -> {
+                    offloadMutex.unlock();
+                    if (exception != null) {
+                        callback.offloadFailed(new 
ManagedLedgerException(exception), ctx);
+                    } else {
+                        callback.offloadComplete(result, ctx);
+                    }
+                });
+            offloadLoop(promise, ledgersToOffload, firstUnoffloaded, 
Optional.empty());
+        } else {
+            callback.offloadFailed(
+                    new 
ManagedLedgerException.OffloadInProgressException("Offload operation already 
running"),
+                    ctx);
+        }
     }
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, 
Queue<LedgerInfo> ledgersToOffload,
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 76cbbb6..27fe14b 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -676,6 +676,195 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         Assert.assertEquals(offloader.offloadedLedgers(), 
ImmutableSet.of(firstLedgerId, thirdLedgerId));
     }
 
+    private static byte[] buildEntry(int size, String pattern) {
+        byte[] entry = new byte[size];
+        byte[] patternBytes = pattern.getBytes();
+
+        for (int i = 0; i < entry.length; i++) {
+            entry[i] = patternBytes[i % patternBytes.length];
+        }
+        return entry;
+    }
+
+    @Test
+    public void testAutoTriggerOffload() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setOffloadAutoTriggerSizeThresholdBytes(100);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        // Ledger will roll twice, offload will run on first ledger after 
second closed
+        for (int i = 0; i < 25; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        // offload should eventually be triggered
+        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
+        Assert.assertEquals(offloader.offloadedLedgers(),
+                            
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+    }
+
+    @Test
+    public void manualTriggerWhileAutoInProgress() throws Exception {
+        CompletableFuture<Void> slowOffload = new CompletableFuture<>();
+        CountDownLatch offloadRunning = new CountDownLatch(1);
+        MockLedgerOffloader offloader = new MockLedgerOffloader() {
+                @Override
+                public CompletableFuture<Void> offload(ReadHandle ledger,
+                                                       UUID uuid,
+                                                       Map<String, String> 
extraMetadata) {
+                    offloadRunning.countDown();
+                    return slowOffload.thenCompose((res) -> 
super.offload(ledger, uuid, extraMetadata));
+                }
+            };
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setOffloadAutoTriggerSizeThresholdBytes(100);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        // Ledger will roll twice, offload will run on first ledger after 
second closed
+        for (int i = 0; i < 25; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+        offloadRunning.await();
+
+        for (int i = 0; i < 20; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+        Position p = ledger.addEntry(buildEntry(10, "last-entry"));
+
+        try {
+            ledger.offloadPrefix(p);
+            Assert.fail("Shouldn't have succeeded");
+        } catch (ManagedLedgerException.OffloadInProgressException e) {
+            // expected
+        }
+
+        slowOffload.complete(null);
+
+        // eventually all over threshold will be offloaded
+        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
+        Assert.assertEquals(offloader.offloadedLedgers(),
+                            
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(2).getLedgerId()));
+
+        // then a manual offload can run and offload the one ledger under the 
threshold
+        ledger.offloadPrefix(p);
+
+        Assert.assertEquals(offloader.offloadedLedgers().size(), 4);
+        Assert.assertEquals(offloader.offloadedLedgers(),
+                            
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(2).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(3).getLedgerId()));
+    }
+
+    @Test
+    public void autoTriggerWhileManualInProgress() throws Exception {
+        CompletableFuture<Void> slowOffload = new CompletableFuture<>();
+        CountDownLatch offloadRunning = new CountDownLatch(1);
+        MockLedgerOffloader offloader = new MockLedgerOffloader() {
+                @Override
+                public CompletableFuture<Void> offload(ReadHandle ledger,
+                                                       UUID uuid,
+                                                       Map<String, String> 
extraMetadata) {
+                    offloadRunning.countDown();
+                    return slowOffload.thenCompose((res) -> 
super.offload(ledger, uuid, extraMetadata));
+                }
+            };
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setOffloadAutoTriggerSizeThresholdBytes(100);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        // Ledger rolls once, threshold not hit so auto shouldn't run
+        for (int i = 0; i < 14; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+        Position p = ledger.addEntry(buildEntry(10, "trigger-entry"));
+
+        OffloadCallbackPromise cbPromise = new OffloadCallbackPromise();
+        ledger.asyncOffloadPrefix(p, cbPromise, null);
+        offloadRunning.await();
+
+        // add enough entries to roll the ledger a couple of times and trigger 
some offloads
+        for (int i = 0; i < 20; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+
+        // allow the manual offload to complete
+        slowOffload.complete(null);
+
+        Assert.assertEquals(cbPromise.join(),
+                            
PositionImpl.get(ledger.getLedgersInfoAsList().get(1).getLedgerId(), 0));
+
+        // auto trigger should eventually offload everything else over 
threshold
+        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+        Assert.assertEquals(offloader.offloadedLedgers(),
+                            
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(1).getLedgerId()));
+    }
+
+    @Test
+    public void multipleAutoTriggers() throws Exception {
+        CompletableFuture<Void> slowOffload = new CompletableFuture<>();
+        CountDownLatch offloadRunning = new CountDownLatch(1);
+        MockLedgerOffloader offloader = new MockLedgerOffloader() {
+                @Override
+                public CompletableFuture<Void> offload(ReadHandle ledger,
+                                                       UUID uuid,
+                                                       Map<String, String> 
extraMetadata) {
+                    offloadRunning.countDown();
+                    return slowOffload.thenCompose((res) -> 
super.offload(ledger, uuid, extraMetadata));
+                }
+            };
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setOffloadAutoTriggerSizeThresholdBytes(100);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        // Ledger will roll twice, offload will run on first ledger after 
second closed
+        for (int i = 0; i < 25; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+        offloadRunning.await();
+
+        // trigger a bunch more rolls. Eventually there will be 5 ledgers.
+        // first 3 should be offloaded, 4th is 100bytes, 5th is 0 bytes.
+        // 4th and 5th sum to 100 bytes so they're just at edge of threshold
+        for (int i = 0; i < 20; i++) {
+            ledger.addEntry(buildEntry(10, "entry-" + i));
+        }
+
+        // allow the first offload to continue
+        slowOffload.complete(null);
+
+        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
+        Assert.assertEquals(offloader.offloadedLedgers(),
+                            
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
+                                            
ledger.getLedgersInfoAsList().get(2).getLedgerId()));
+    }
+
     static void assertEventuallyTrue(BooleanSupplier predicate) throws 
Exception {
         // wait up to 3 seconds
         for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to