merlimat closed pull request #1513: Managed ledger uses ReadHandle in read path URL: https://github.com/apache/incubator-pulsar/pull/1513
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index b38999b091..9aa5e1debf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -24,7 +24,7 @@ import com.google.common.base.Charsets; import java.util.Arrays; import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.DigestType; /** * Configuration class for a ManagedLedger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java index bb8f2a651f..0c99650cec 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java @@ -18,7 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; -import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.commons.lang3.tuple.Pair; @@ -94,7 +94,7 @@ * @param ctx * the context object */ - void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, ReadEntriesCallback callback, Object ctx); /** @@ -111,7 +111,7 @@ void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean is * @param ctx * the context object */ - void asyncReadEntry(LedgerHandle lh, PositionImpl position, ReadEntryCallback callback, Object ctx); + void asyncReadEntry(ReadHandle lh, PositionImpl position, ReadEntryCallback callback, Object ctx); /** * Get the total size in bytes of all the entries stored in this cache. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 37ddc54629..ff80feccf6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -28,10 +28,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import java.util.Collection; +import java.util.Iterator; import java.util.List; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -159,7 +160,7 @@ public void invalidateAllEntries(long ledgerId) { } @Override - public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEntryCallback callback, + public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback, final Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId()); @@ -171,37 +172,38 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), (rc, ledgerHandle, sequence, obj) -> { - if (rc != BKException.Code.OK) { - ml.invalidateLedgerHandle(ledgerHandle, rc); - callback.readEntryFailed(createManagedLedgerException(rc), obj); - return; - } - - if (sequence.hasMoreElements()) { - LedgerEntry ledgerEntry = sequence.nextElement(); - EntryImpl returnEntry = EntryImpl.create(ledgerEntry); - - // The EntryImpl is now the owner of the buffer, so we can release the original one - ledgerEntry.getEntryBuffer().release(); - - manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); - ml.mbean.addReadEntriesSample(1, returnEntry.getLength()); - - ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { - callback.readEntryComplete(returnEntry, obj); - })); - } else { - // got an empty sequence - callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), obj); - } - }, ctx); + lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync( + (ledgerEntries, exception) -> { + if (exception != null) { + ml.invalidateLedgerHandle(lh, exception); + callback.readEntryFailed(createManagedLedgerException(exception), ctx); + return; + } + + try { + Iterator<LedgerEntry> iterator = ledgerEntries.iterator(); + if (iterator.hasNext()) { + LedgerEntry ledgerEntry = iterator.next(); + EntryImpl returnEntry = EntryImpl.create(ledgerEntry); + + manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); + ml.mbean.addReadEntriesSample(1, returnEntry.getLength()); + callback.readEntryComplete(returnEntry, ctx); + } else { + // got an empty sequence + callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), + ctx); + } + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())); } } @Override @SuppressWarnings({ "unchecked", "rawtypes" }) - public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; @@ -239,43 +241,43 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo } // Read all the entries from bookkeeper - lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) -> { - - if (rc != BKException.Code.OK) { - if (rc == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(createManagedLedgerException(rc), ctx); - } else { - ml.invalidateLedgerHandle(lh1, rc); - ManagedLedgerException mlException = createManagedLedgerException(rc); - callback.readEntriesFailed(mlException, ctx); - } - return; - } - - checkNotNull(ml.getName()); - checkNotNull(ml.getExecutor()); - ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> { - // We got the entries, we need to transform them to a List<> type - long totalSize = 0; - final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); - while (sequence.hasMoreElements()) { - // Insert the entries at the end of the list (they will be unsorted for now) - LedgerEntry ledgerEntry = sequence.nextElement(); - EntryImpl entry = EntryImpl.create(ledgerEntry); - ledgerEntry.getEntryBuffer().release(); - - entriesToReturn.add(entry); - - totalSize += entry.getLength(); - - } - - manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); - ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); - - callback.readEntriesComplete((List) entriesToReturn, ctx); - })); - }, callback); + lh.readAsync(firstEntry, lastEntry).whenCompleteAsync( + (ledgerEntries, exception) -> { + if (exception != null) { + if (exception instanceof BKException + && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + } else { + ml.invalidateLedgerHandle(lh, exception); + ManagedLedgerException mlException = createManagedLedgerException(exception); + callback.readEntriesFailed(mlException, ctx); + } + return; + } + + checkNotNull(ml.getName()); + checkNotNull(ml.getExecutor()); + + try { + // We got the entries, we need to transform them to a List<> type + long totalSize = 0; + final List<EntryImpl> entriesToReturn + = Lists.newArrayListWithExpectedSize(entriesToRead); + for (LedgerEntry e : ledgerEntries) { + EntryImpl entry = EntryImpl.create(e); + + entriesToReturn.add(entry); + totalSize += entry.getLength(); + } + + manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); + ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); + + callback.readEntriesComplete((List) entriesToReturn, ctx); + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 9f6f837982..c551002b47 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -33,8 +33,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -190,37 +190,35 @@ public void clear() { } @Override - public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { - lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() { - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object bkctx) { - if (rc != BKException.Code.OK) { - callback.readEntriesFailed(createManagedLedgerException(rc), ctx); - return; - } - - List<Entry> entries = Lists.newArrayList(); - long totalSize = 0; - while (seq.hasMoreElements()) { - // Insert the entries at the end of the list (they will be unsorted for now) - LedgerEntry ledgerEntry = seq.nextElement(); - EntryImpl entry = EntryImpl.create(ledgerEntry); - ledgerEntry.getEntryBuffer().release(); - - entries.add(entry); - totalSize += entry.getLength(); - } - - mlFactoryMBean.recordCacheMiss(entries.size(), totalSize); - ml.mbean.addReadEntriesSample(entries.size(), totalSize); - - callback.readEntriesComplete(entries, null); - } - }, null); + lh.readAsync(firstEntry, lastEntry).whenComplete( + (ledgerEntries, exception) -> { + if (exception != null) { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + return; + } + List<Entry> entries = Lists.newArrayList(); + long totalSize = 0; + try { + for (LedgerEntry e : ledgerEntries) { + // Insert the entries at the end of the list (they will be unsorted for now) + EntryImpl entry = EntryImpl.create(e); + entries.add(entry); + totalSize += entry.getLength(); + } + } finally { + ledgerEntries.close(); + } + mlFactoryMBean.recordCacheMiss(entries.size(), totalSize); + ml.mbean.addReadEntriesSample(entries.size(), totalSize); + + callback.readEntriesComplete(entries, null); + }); } @Override - public void asyncReadEntry(LedgerHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, + public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index d1c6defee9..bf3e925605 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -25,7 +25,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCounted; -import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.mledger.Entry; public final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 462428e1c2..acf17f12c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -92,6 +92,7 @@ protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; + private final BookKeeper.DigestType digestType; protected volatile PositionImpl markDeletePosition; protected volatile PositionImpl readPosition; @@ -179,6 +180,7 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties, this.config = config; this.ledger = ledger; this.name = cursorName; + this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); PENDING_READ_OPS_UPDATER.set(this, 0); @@ -253,7 +255,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); - bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> { if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } @@ -1924,7 +1926,7 @@ void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), - config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + config.getMetadataAckQuorumSize(), digestType, config.getPassword(), (rc, lh, ctx) -> { ledger.getExecutor().execute(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f6c7e3ffd9..77b1dff7b2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -54,6 +54,8 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -104,11 +106,12 @@ private final BookKeeper bookKeeper; private final String name; + private final BookKeeper.DigestType digestType; private ManagedLedgerConfig config; private final MetaStore store; - private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>(); + private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(); private final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>(); private volatile Stat ledgersStat; @@ -212,6 +215,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.config = config; this.store = store; this.name = name; + this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); this.scheduledExecutor = scheduledExecutor; this.executor = orderedExecutor; TOTAL_SIZE_UPDATER.set(this, 0); @@ -278,7 +282,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { log.debug("[{}] Opening legder {}", name, id); } mbean.startDataLedgerOpenOp(); - bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null); + bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null); } else { initializeBookKeeper(callback); } @@ -342,7 +346,7 @@ public void operationFailed(MetaStoreException e) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), - config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { + digestType, config.getPassword(), (rc, lh, ctx) -> { executor.executeOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { @@ -528,7 +532,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null, + config.getAckQuorumSize(), digestType, config.getPassword(), this, null, Collections.emptyMap()); } } else { @@ -1263,7 +1267,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null, + config.getAckQuorumSize(), digestType, config.getPassword(), this, null, Collections.emptyMap()); } } @@ -1314,52 +1318,53 @@ void asyncReadEntries(OpReadEntry opReadEntry) { } } - CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) { - CompletableFuture<LedgerHandle> ledgerHandle = ledgerCache.get(ledgerId); + CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) { + CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId); if (ledgerHandle != null) { return ledgerHandle; } // If not present try again and create if necessary return ledgerCache.computeIfAbsent(ledgerId, lid -> { - // Open the ledger for reading if it was not already opened - CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); - - if (log.isDebugEnabled()) { - log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId); - } - mbean.startDataLedgerOpenOp(); - bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), - (int rc, LedgerHandle lh, Object ctx) -> { - executor.executeOrdered(name, safeRun(() -> { + // Open the ledger for reading if it was not already opened + if (log.isDebugEnabled()) { + log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId); + } + mbean.startDataLedgerOpenOp(); + + CompletableFuture<ReadHandle> promise = new CompletableFuture<>(); + bookKeeper.newOpenLedgerOp() + .withRecovery(true) + .withLedgerId(ledgerId) + .withDigestType(config.getDigestType()) + .withPassword(config.getPassword()).execute() + .whenCompleteAsync((res,ex) -> { mbean.endDataLedgerOpenOp(); - if (rc != BKException.Code.OK) { - // Remove the ledger future from cache to give chance to reopen it later - ledgerCache.remove(ledgerId, future); - future.completeExceptionally(createManagedLedgerException(rc)); + if (ex != null) { + ledgerCache.remove(ledgerId, promise); + promise.completeExceptionally(createManagedLedgerException(ex)); } else { if (log.isDebugEnabled()) { - log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId()); + log.debug("[{}] Successfully opened ledger {} for reading", name, ledgerId); } - future.complete(lh); + promise.complete(res); } - })); - }, null); - return future; - }); + }, executor.chooseThread(name)); + return promise; + }); } - void invalidateLedgerHandle(LedgerHandle ledgerHandle, int rc) { + void invalidateLedgerHandle(ReadHandle ledgerHandle, Throwable t) { long ledgerId = ledgerHandle.getId(); if (ledgerId != currentLedger.getId()) { // remove handle from ledger cache since we got a (read) error ledgerCache.remove(ledgerId); if (log.isDebugEnabled()) { - log.debug("[{}] Removed ledger {} from cache (after read error: {})", name, ledgerId, rc); + log.debug("[{}] Removed ledger {} from cache (after read error)", name, ledgerId, t); } } else { if (log.isDebugEnabled()) { - log.debug("[{}] Ledger that encountered read error {} is current ledger", name, rc); + log.debug("[{}] Ledger that encountered read error is current ledger", name, t); } } } @@ -1384,7 +1389,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct } - private void internalReadFromLedger(LedgerHandle ledger, OpReadEntry opReadEntry) { + private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { // Perform the read long firstEntry = opReadEntry.readPosition.getEntryId(); @@ -2256,6 +2261,14 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod } } + public static ManagedLedgerException createManagedLedgerException(Throwable t) { + if (t instanceof org.apache.bookkeeper.client.api.BKException) { + return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)t).getCode()); + } else { + return new ManagedLedgerException("Unknown exception"); + } + } + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java index 39405af512..06acb5b866 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.common.naming.TopicName; @@ -52,9 +53,9 @@ private boolean accurate = false; private String brokerName; - public ManagedLedgerOfflineBacklog(BookKeeper.DigestType digestType, byte[] password, String brokerName, + public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] password, String brokerName, boolean accurate) { - this.digestType = digestType; + this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType); this.password = password; this.accurate = accurate; this.brokerName = brokerName; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index 1be2692a6b..c48ea24d88 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -27,11 +27,13 @@ import java.lang.reflect.Method; import java.util.List; import java.util.Vector; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; +import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -59,7 +61,7 @@ public void setUp(Method method) throws Exception { @Test(timeOut = 5000) void testRead() throws Exception { - LedgerHandle lh = getLedgerHandle(); + ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @@ -86,12 +88,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); // Verify no entries were read from bookkeeper - verify(lh, never()).asyncReadEntries(anyLong(), anyLong(), any(ReadCallback.class), any()); + verify(lh, never()).readAsync(anyLong(), anyLong()); } @Test(timeOut = 5000) void testReadMissingBefore() throws Exception { - LedgerHandle lh = getLedgerHandle(); + ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @@ -119,7 +121,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test(timeOut = 5000) void testReadMissingAfter() throws Exception { - LedgerHandle lh = getLedgerHandle(); + ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @@ -147,7 +149,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test(timeOut = 5000) void testReadMissingMiddle() throws Exception { - LedgerHandle lh = getLedgerHandle(); + ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @@ -176,7 +178,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test(timeOut = 5000) void testReadMissingMultiple() throws Exception { - LedgerHandle lh = getLedgerHandle(); + ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); @@ -205,18 +207,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { @Test(timeOut = 5000) void testReadWithError() throws Exception { - final LedgerHandle lh = getLedgerHandle(); + final ReadHandle lh = getLedgerHandle(); when(lh.getId()).thenReturn((long) 0); - doAnswer(new Answer<Object>() { - public Object answer(InvocationOnMock invocation) { - Object[] args = invocation.getArguments(); - ReadCallback callback = (ReadCallback) args[2]; - Object ctx = args[3]; - callback.readComplete(BKException.Code.NoSuchLedgerExistsException, lh, null, ctx); - return null; - } - }).when(lh).asyncReadEntries(anyLong(), anyLong(), any(ReadCallback.class), any()); + doAnswer((invocation) -> { + CompletableFuture<LedgerEntries> future = new CompletableFuture<>(); + future.completeExceptionally(new BKNoSuchLedgerExistsException()); + return future; + }).when(lh).readAsync(anyLong(), anyLong()); EntryCacheManager cacheManager = factory.getEntryCacheManager(); EntryCache entryCache = cacheManager.getEntryCache(ml); @@ -238,29 +236,25 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); } - private static LedgerHandle getLedgerHandle() { - final LedgerHandle lh = mock(LedgerHandle.class); + private static ReadHandle getLedgerHandle() { + final ReadHandle lh = mock(ReadHandle.class); final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS); - doReturn(new byte[10]).when(ledgerEntry).getEntry(); doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer(); doReturn((long) 10).when(ledgerEntry).getLength(); - doAnswer(new Answer<Object>() { - public Object answer(InvocationOnMock invocation) { + doAnswer((invocation) -> { Object[] args = invocation.getArguments(); long firstEntry = (Long) args[0]; long lastEntry = (Long) args[1]; - ReadCallback callback = (ReadCallback) args[2]; - Object ctx = args[3]; Vector<LedgerEntry> entries = new Vector<LedgerEntry>(); for (int i = 0; i <= (lastEntry - firstEntry); i++) { entries.add(ledgerEntry); } - callback.readComplete(0, lh, entries.elements(), ctx); - return null; - } - }).when(lh).asyncReadEntries(anyLong(), anyLong(), any(ReadCallback.class), any()); + LedgerEntries ledgerEntries = mock(LedgerEntries.class); + doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator(); + return CompletableFuture.completedFuture(ledgerEntries); + }).when(lh).readAsync(anyLong(), anyLong()); return lh; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index ad85a35a99..8cf5ee4775 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2473,7 +2473,8 @@ public void operationFailed(ManagedLedgerException exception) { latch2.await(); try { - bkc.openLedgerNoRecovery(ledgerId, mlConfig.getDigestType(), mlConfig.getPassword()); + bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()), + mlConfig.getPassword()); fail("ledger should have deleted due to update-cursor failure"); } catch (BKException e) { // ok diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 3035c286a5..a5b58d7df9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -33,8 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperTestClient; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.Entry; @@ -345,7 +346,7 @@ public void ledgerFencedByAutoReplication() throws Exception { PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes()); // Trigger the closure of the data ledger - bkc.openLedger(p1.getLedgerId(), DigestType.CRC32C, new byte[] {}); + bkc.openLedger(p1.getLedgerId(), BookKeeper.DigestType.CRC32C, new byte[] {}); ledger.addEntry("entry-2".getBytes()); @@ -423,8 +424,8 @@ public void testOfflineTopicBacklog() throws Exception { entries.forEach(e -> e.release()); ledger.close(); - ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(DigestType.CRC32, - "".getBytes(Charsets.UTF_8), "", false); + ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog( + DigestType.CRC32, "".getBytes(Charsets.UTF_8), "", false); PersistentOfflineTopicStats offlineTopicStats = offlineTopicBacklog.getEstimatedUnloadedTopicBacklog( (ManagedLedgerFactoryImpl) factory, "property/cluster/namespace/my-ledger"); factory.shutdown(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 4efeefb9c4..35eb986df5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -24,7 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.Entry; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index dd9cabdc36..100271487e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -24,7 +24,7 @@ import java.util.Properties; import java.util.Set; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.configuration.PulsarConfiguration; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java index 038fb4dec3..d830c4ff05 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java @@ -32,7 +32,7 @@ import java.io.PrintWriter; import java.util.Properties; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.ServiceConfiguration; import org.testng.annotations.Test; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 3730121098..c07217c9b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -384,7 +384,7 @@ private String getSchemaPath(String schemaId) { config.getManagedLedgerDefaultEnsembleSize(), config.getManagedLedgerDefaultWriteQuorum(), config.getManagedLedgerDefaultAckQuorum(), - config.getManagedLedgerDigestType(), + BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { @@ -402,7 +402,7 @@ private String getSchemaPath(String schemaId) { final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); bookKeeper.asyncOpenLedger( ledgerId, - config.getManagedLedgerDigestType(), + BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()), LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index e975b37586..0ffea79933 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -49,7 +49,7 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; -import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services