This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66c8428  Read OffloadedLedger if available (#1640)
66c8428 is described below

commit 66c8428c0c52810a1c834a55a8412e314fcbf387
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Apr 27 19:28:02 2018 +0200

    Read OffloadedLedger if available (#1640)
    
    * Read OffloadedLedger if available
    
    If a ledger has been previously offloaded to long term storage, read
    the ledger from long term storage, rather than from bookkeeper.
    
    Master issue #1511
    
    * Fix NPE
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  41 +--
 .../mledger/impl/OffloadPrefixReadTest.java        | 289 +++++++++++++++++++++
 2 files changed, 314 insertions(+), 16 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 59f4ad7..a3ec2e5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1361,23 +1361,32 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 mbean.startDataLedgerOpenOp();
 
                 CompletableFuture<ReadHandle> promise = new 
CompletableFuture<>();
-                bookKeeper.newOpenLedgerOp()
-                    .withRecovery(true)
-                    .withLedgerId(ledgerId)
-                    .withDigestType(config.getDigestType())
-                    .withPassword(config.getPassword()).execute()
-                    .whenCompleteAsync((res,ex) -> {
-                            mbean.endDataLedgerOpenOp();
-                            if (ex != null) {
-                                ledgerCache.remove(ledgerId, promise);
-                                
promise.completeExceptionally(createManagedLedgerException(ex));
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Successfully opened ledger 
{} for reading", name, ledgerId);
-                                }
-                                promise.complete(res);
+
+                LedgerInfo info = ledgers.get(ledgerId);
+                CompletableFuture<ReadHandle> openFuture = new 
CompletableFuture<>();
+                if (info != null && info.hasOffloadContext() && 
info.getOffloadContext().getComplete()) {
+                    UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
+                                        info.getOffloadContext().getUidLsb());
+                    openFuture = 
config.getLedgerOffloader().readOffloaded(ledgerId, uid);
+                } else {
+                    openFuture = bookKeeper.newOpenLedgerOp()
+                        .withRecovery(true)
+                        .withLedgerId(ledgerId)
+                        .withDigestType(config.getDigestType())
+                        .withPassword(config.getPassword()).execute();
+                }
+                openFuture.whenCompleteAsync((res,ex) -> {
+                        mbean.endDataLedgerOpenOp();
+                        if (ex != null) {
+                            ledgerCache.remove(ledgerId, promise);
+                            
promise.completeExceptionally(createManagedLedgerException(ex));
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Successfully opened ledger {} 
for reading", name, ledgerId);
                             }
-                        }, executor.chooseThread(name));
+                            promise.complete(res);
+                        }
+                    }, executor.chooseThread(name));
                 return promise;
             });
     }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
new file mode 100644
index 0000000..8bbb44a
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
+    private static final Logger log = 
LoggerFactory.getLogger(OffloadPrefixReadTest.class);
+
+    @Test
+    public void testOffloadRead() throws Exception {
+        MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = 
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+
+        for (int i = 0; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                            .filter(e -> 
e.getOffloadContext().getComplete()).count(), 2);
+        
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+        
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+
+        UUID firstLedgerUUID = new 
UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
+                                        
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
+        UUID secondLedgerUUID = new 
UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
+                                         
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
+
+        ManagedCursor cursor = 
ledger.newNonDurableCursor(PositionImpl.earliest);
+        int i = 0;
+        for (Entry e : cursor.readEntries(10)) {
+            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+        }
+        verify(offloader, times(1)).readOffloaded(anyLong(), anyObject());
+        verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID));
+
+        for (Entry e : cursor.readEntries(10)) {
+            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+        }
+        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
+        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID));
+
+        for (Entry e : cursor.readEntries(5)) {
+            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+        }
+        verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
+    }
+
+    static class MockLedgerOffloader implements LedgerOffloader {
+        ConcurrentHashMap<UUID, ReadHandle> offloads = new 
ConcurrentHashMap<UUID, ReadHandle>();
+
+        @Override
+        public CompletableFuture<Void> offload(ReadHandle ledger,
+                                               UUID uuid,
+                                               Map<String, String> 
extraMetadata) {
+            CompletableFuture<Void> promise = new CompletableFuture<>();
+            try {
+                offloads.put(uuid, new MockOffloadReadHandle(ledger));
+                promise.complete(null);
+            } catch (Exception e) {
+                promise.completeExceptionally(e);
+            }
+            return promise;
+        }
+
+        @Override
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid) {
+            return CompletableFuture.completedFuture(offloads.get(uuid));
+        }
+
+        @Override
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid) {
+            offloads.remove(uuid);
+            return CompletableFuture.completedFuture(null);
+        };
+    }
+
+    static class MockOffloadReadHandle implements ReadHandle {
+        final long id;
+        final List<ByteBuf> entries = Lists.newArrayList();
+        final LedgerMetadata metadata;
+
+        MockOffloadReadHandle(ReadHandle toCopy) throws Exception {
+            id = toCopy.getId();
+            long lac = toCopy.getLastAddConfirmed();
+            try (LedgerEntries entries = toCopy.read(0, lac)) {
+                for (LedgerEntry e : entries) {
+                    this.entries.add(e.getEntryBuffer().retainedSlice());
+                }
+            }
+            metadata = new MockMetadata(toCopy.getLedgerMetadata());
+        }
+
+        @Override
+        public long getId() { return id; }
+
+        @Override
+        public LedgerMetadata getLedgerMetadata() {
+            return metadata;
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readAsync(long firstEntry, 
long lastEntry) {
+            List<LedgerEntry> readEntries = Lists.newArrayList();
+            for (long eid = firstEntry; eid <= lastEntry; eid++) {
+                ByteBuf buf = entries.get((int)eid).retainedSlice();
+                readEntries.add(LedgerEntryImpl.create(id, eid, 
buf.readableBytes(), buf));
+            }
+            return 
CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries));
+        }
+
+        @Override
+        public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long 
firstEntry, long lastEntry) {
+            return unsupported();
+        }
+
+        @Override
+        public CompletableFuture<Long> readLastAddConfirmedAsync() {
+            return unsupported();
+        }
+
+        @Override
+        public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+            return unsupported();
+        }
+
+        @Override
+        public long getLastAddConfirmed() {
+            return entries.size() - 1;
+        }
+
+        @Override
+        public long getLength() {
+            return metadata.getLength();
+        }
+
+        @Override
+        public boolean isClosed() {
+            return metadata.isClosed();
+        }
+
+        @Override
+        public CompletableFuture<LastConfirmedAndEntry> 
readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                               
           long timeOutInMillis,
+                                                                               
           boolean parallel) {
+            return unsupported();
+        }
+
+        private <T> CompletableFuture<T> unsupported() {
+            CompletableFuture<T> future = new CompletableFuture<>();
+            future.completeExceptionally(new UnsupportedOperationException());
+            return future;
+        }
+    }
+
+    static class MockMetadata implements LedgerMetadata {
+        private final int ensembleSize;
+        private final int writeQuorumSize;
+        private final int ackQuorumSize;
+        private final long lastEntryId;
+        private final long length;
+        private final DigestType digestType;
+        private final long ctime;
+        private final boolean isClosed;
+        private final Map<String, byte[]> customMetadata;
+
+        MockMetadata(LedgerMetadata toCopy) {
+            ensembleSize = toCopy.getEnsembleSize();
+            writeQuorumSize = toCopy.getWriteQuorumSize();
+            ackQuorumSize = toCopy.getAckQuorumSize();
+            lastEntryId = toCopy.getLastEntryId();
+            length = toCopy.getLength();
+            digestType = toCopy.getDigestType();
+            ctime = toCopy.getCtime();
+            isClosed = toCopy.isClosed();
+
+            customMetadata = ImmutableMap.copyOf(toCopy.getCustomMetadata());
+        }
+
+        @Override
+        public int getEnsembleSize() { return ensembleSize; }
+
+        @Override
+        public int getWriteQuorumSize() { return writeQuorumSize; }
+
+        @Override
+        public int getAckQuorumSize() { return ackQuorumSize; }
+
+        @Override
+        public long getLastEntryId() { return lastEntryId; }
+
+        @Override
+        public long getLength() { return length; }
+
+        @Override
+        public DigestType getDigestType() { return digestType; }
+
+        @Override
+        public long getCtime() { return ctime; }
+
+        @Override
+        public boolean isClosed() { return isClosed; }
+
+        @Override
+        public Map<String, byte[]> getCustomMetadata() { return 
customMetadata; }
+
+        @Override
+        public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+            throw new UnsupportedOperationException("Pulsar shouldn't look at 
this");
+        }
+
+        @Override
+        public NavigableMap<Long, ? extends List<BookieSocketAddress>> 
getAllEnsembles() {
+            throw new UnsupportedOperationException("Pulsar shouldn't look at 
this");
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to