This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b982943 Don't offload empty ledgers (#1687) b982943 is described below commit b982943c77972211c937270d305d14f9d12d5f84 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Wed May 2 19:10:25 2018 +0200 Don't offload empty ledgers (#1687) It shouldn't be possible for a ledger in a managed ledger to be empty (it should be cleaned up on recovery), but this patch adds defensive code so that if they do exist for some reason, they won't be offloaded. Master Issue: #1511 --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 43 ++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f1d7100..1a606b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1966,7 +1966,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { long firstLedgerRetained = current; for (LedgerInfo ls : ledgers.headMap(current).values()) { if (requestOffloadTo.getLedgerId() > ls.getLedgerId()) { - if (!ls.getOffloadContext().getComplete()) { + // don't offload if ledger has already been offloaded, or is empty + if (!ls.getOffloadContext().getComplete() && ls.getSize() > 0) { ledgersToOffload.add(ls); } } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 278182c..76cbbb6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl; import com.google.common.collect.ImmutableSet; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; @@ -633,6 +635,47 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger)); } + @Test + public void testDontOffloadEmpty() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + int i = 0; + for (; i < 35; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4); + + long firstLedgerId = ledger.getLedgersInfoAsList().get(0).getLedgerId(); + long secondLedgerId = ledger.getLedgersInfoAsList().get(1).getLedgerId(); + long thirdLedgerId = ledger.getLedgersInfoAsList().get(2).getLedgerId(); + long fourthLedgerId = ledger.getLedgersInfoAsList().get(3).getLedgerId(); + + // make an ledger empty + Field ledgersField = ledger.getClass().getDeclaredField("ledgers"); + ledgersField.setAccessible(true); + Map<Long, LedgerInfo> ledgers = (Map<Long,LedgerInfo>)ledgersField.get(ledger); + ledgers.put(secondLedgerId, + ledgers.get(secondLedgerId).toBuilder().setEntries(0).setSize(0).build()); + + PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + Assert.assertEquals(firstUnoffloaded.getLedgerId(), fourthLedgerId); + Assert.assertEquals(firstUnoffloaded.getEntryId(), 0); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 4); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()) + .map(e -> e.getLedgerId()).collect(Collectors.toSet()), + offloader.offloadedLedgers()); + Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId)); + } + static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception { // wait up to 3 seconds for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) { -- To stop receiving notification emails like this one, please contact si...@apache.org.