This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 66c8428 Read OffloadedLedger if available (#1640) 66c8428 is described below commit 66c8428c0c52810a1c834a55a8412e314fcbf387 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Fri Apr 27 19:28:02 2018 +0200 Read OffloadedLedger if available (#1640) * Read OffloadedLedger if available If a ledger has been previously offloaded to long term storage, read the ledger from long term storage, rather than from bookkeeper. Master issue #1511 * Fix NPE --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 41 +-- .../mledger/impl/OffloadPrefixReadTest.java | 289 +++++++++++++++++++++ 2 files changed, 314 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 59f4ad7..a3ec2e5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1361,23 +1361,32 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { mbean.startDataLedgerOpenOp(); CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); - bookKeeper.newOpenLedgerOp() - .withRecovery(true) - .withLedgerId(ledgerId) - .withDigestType(config.getDigestType()) - .withPassword(config.getPassword()).execute() - .whenCompleteAsync((res,ex) -> { - mbean.endDataLedgerOpenOp(); - if (ex != null) { - ledgerCache.remove(ledgerId, promise); - promise.completeExceptionally(createManagedLedgerException(ex)); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] Successfully opened ledger {} for reading", name, ledgerId); - } - promise.complete(res); + + LedgerInfo info = ledgers.get(ledgerId); + CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>(); + if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) { + UUID uid = new UUID(info.getOffloadContext().getUidMsb(), + info.getOffloadContext().getUidLsb()); + openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid); + } else { + openFuture = bookKeeper.newOpenLedgerOp() + .withRecovery(true) + .withLedgerId(ledgerId) + .withDigestType(config.getDigestType()) + .withPassword(config.getPassword()).execute(); + } + openFuture.whenCompleteAsync((res,ex) -> { + mbean.endDataLedgerOpenOp(); + if (ex != null) { + ledgerCache.remove(ledgerId, promise); + promise.completeExceptionally(createManagedLedgerException(ex)); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Successfully opened ledger {} for reading", name, ledgerId); } - }, executor.chooseThread(name)); + promise.complete(res); + } + }, executor.chooseThread(name)); return promise; }); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java new file mode 100644 index 0000000..8bbb44a --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; + +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class OffloadPrefixReadTest extends MockedBookKeeperTestCase { + private static final Logger log = LoggerFactory.getLogger(OffloadPrefixReadTest.class); + + @Test + public void testOffloadRead() throws Exception { + MockLedgerOffloader offloader = spy(new MockLedgerOffloader()); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setLedgerOffloader(offloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + + for (int i = 0; i < 25; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); + Assert.assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 2); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); + + UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(), + ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); + UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(), + ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb()); + + ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); + int i = 0; + for (Entry e : cursor.readEntries(10)) { + Assert.assertEquals(new String(e.getData()), "entry-" + i++); + } + verify(offloader, times(1)).readOffloaded(anyLong(), anyObject()); + verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID)); + + for (Entry e : cursor.readEntries(10)) { + Assert.assertEquals(new String(e.getData()), "entry-" + i++); + } + verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); + verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID)); + + for (Entry e : cursor.readEntries(5)) { + Assert.assertEquals(new String(e.getData()), "entry-" + i++); + } + verify(offloader, times(2)).readOffloaded(anyLong(), anyObject()); + } + + static class MockLedgerOffloader implements LedgerOffloader { + ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>(); + + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + CompletableFuture<Void> promise = new CompletableFuture<>(); + try { + offloads.put(uuid, new MockOffloadReadHandle(ledger)); + promise.complete(null); + } catch (Exception e) { + promise.completeExceptionally(e); + } + return promise; + } + + @Override + public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid) { + return CompletableFuture.completedFuture(offloads.get(uuid)); + } + + @Override + public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid) { + offloads.remove(uuid); + return CompletableFuture.completedFuture(null); + }; + } + + static class MockOffloadReadHandle implements ReadHandle { + final long id; + final List<ByteBuf> entries = Lists.newArrayList(); + final LedgerMetadata metadata; + + MockOffloadReadHandle(ReadHandle toCopy) throws Exception { + id = toCopy.getId(); + long lac = toCopy.getLastAddConfirmed(); + try (LedgerEntries entries = toCopy.read(0, lac)) { + for (LedgerEntry e : entries) { + this.entries.add(e.getEntryBuffer().retainedSlice()); + } + } + metadata = new MockMetadata(toCopy.getLedgerMetadata()); + } + + @Override + public long getId() { return id; } + + @Override + public LedgerMetadata getLedgerMetadata() { + return metadata; + } + + @Override + public CompletableFuture<Void> closeAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { + List<LedgerEntry> readEntries = Lists.newArrayList(); + for (long eid = firstEntry; eid <= lastEntry; eid++) { + ByteBuf buf = entries.get((int)eid).retainedSlice(); + readEntries.add(LedgerEntryImpl.create(id, eid, buf.readableBytes(), buf)); + } + return CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries)); + } + + @Override + public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { + return unsupported(); + } + + @Override + public CompletableFuture<Long> readLastAddConfirmedAsync() { + return unsupported(); + } + + @Override + public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { + return unsupported(); + } + + @Override + public long getLastAddConfirmed() { + return entries.size() - 1; + } + + @Override + public long getLength() { + return metadata.getLength(); + } + + @Override + public boolean isClosed() { + return metadata.isClosed(); + } + + @Override + public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, + long timeOutInMillis, + boolean parallel) { + return unsupported(); + } + + private <T> CompletableFuture<T> unsupported() { + CompletableFuture<T> future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException()); + return future; + } + } + + static class MockMetadata implements LedgerMetadata { + private final int ensembleSize; + private final int writeQuorumSize; + private final int ackQuorumSize; + private final long lastEntryId; + private final long length; + private final DigestType digestType; + private final long ctime; + private final boolean isClosed; + private final Map<String, byte[]> customMetadata; + + MockMetadata(LedgerMetadata toCopy) { + ensembleSize = toCopy.getEnsembleSize(); + writeQuorumSize = toCopy.getWriteQuorumSize(); + ackQuorumSize = toCopy.getAckQuorumSize(); + lastEntryId = toCopy.getLastEntryId(); + length = toCopy.getLength(); + digestType = toCopy.getDigestType(); + ctime = toCopy.getCtime(); + isClosed = toCopy.isClosed(); + + customMetadata = ImmutableMap.copyOf(toCopy.getCustomMetadata()); + } + + @Override + public int getEnsembleSize() { return ensembleSize; } + + @Override + public int getWriteQuorumSize() { return writeQuorumSize; } + + @Override + public int getAckQuorumSize() { return ackQuorumSize; } + + @Override + public long getLastEntryId() { return lastEntryId; } + + @Override + public long getLength() { return length; } + + @Override + public DigestType getDigestType() { return digestType; } + + @Override + public long getCtime() { return ctime; } + + @Override + public boolean isClosed() { return isClosed; } + + @Override + public Map<String, byte[]> getCustomMetadata() { return customMetadata; } + + @Override + public List<BookieSocketAddress> getEnsembleAt(long entryId) { + throw new UnsupportedOperationException("Pulsar shouldn't look at this"); + } + + @Override + public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() { + throw new UnsupportedOperationException("Pulsar shouldn't look at this"); + } + } +} -- To stop receiving notification emails like this one, please contact si...@apache.org.