Repository: bookkeeper Updated Branches: refs/heads/master 8a3922e00 -> 355ddbc7c
BOOKKEEPER-867: New Client API to allow applications pass-in EntryId. (Venkateswararao Jujjuri via sijie) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/355ddbc7 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/355ddbc7 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/355ddbc7 Branch: refs/heads/master Commit: 355ddbc7c13e39c49a58371d467e87aa80f696d7 Parents: 8a3922e Author: Sijie Guo <si...@apache.org> Authored: Thu Oct 22 00:22:36 2015 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Thu Oct 22 00:22:36 2015 -0700 ---------------------------------------------------------------------- .../apache/bookkeeper/client/BKException.java | 11 + .../apache/bookkeeper/client/BookKeeper.java | 93 +++++ .../bookkeeper/client/LedgerCreateOp.java | 15 +- .../apache/bookkeeper/client/LedgerHandle.java | 111 +++++- .../bookkeeper/client/LedgerHandleAdv.java | 214 +++++++++++ .../apache/bookkeeper/client/PendingAddOp.java | 17 + .../client/BookieWriteLedgerTest.java | 365 ++++++++++++++++++- 7 files changed, 815 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java index 3991085..b2355cd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java @@ -96,6 +96,8 @@ public abstract class BKException extends Exception { return new BKIllegalOpException(); case Code.AddEntryQuorumTimeoutException: return new BKAddEntryQuorumTimeoutException(); + case Code.DuplicateEntryIdException: + return new BKDuplicateEntryIdException(); default: return new BKUnexpectedConditionException(); } @@ -128,6 +130,7 @@ public abstract class BKException extends Exception { int ClientClosedException = -19; int LedgerExistException = -20; int AddEntryQuorumTimeoutException = -21; + int DuplicateEntryIdException = -22; int IllegalOpException = -100; int LedgerFencedException = -101; @@ -192,6 +195,8 @@ public abstract class BKException extends Exception { return "Bookie protocol version on server is incompatible with client"; case Code.MetadataVersionException: return "Bad ledger metadata version"; + case Code.DuplicateEntryIdException: + return "Attempted to add Duplicate entryId"; case Code.LedgerFencedException: return "Ledger has been fenced off. Some other client must have opened it to read"; case Code.UnauthorizedAccessException: @@ -261,6 +266,12 @@ public abstract class BKException extends Exception { } } + public static class BKDuplicateEntryIdException extends BKException { + public BKDuplicateEntryIdException() { + super(Code.DuplicateEntryIdException); + } + } + public static class BKUnexpectedConditionException extends BKException { public BKUnexpectedConditionException() { super(Code.UnexpectedConditionException); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 6bb71fa..ed744b0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -537,6 +537,99 @@ public class BookKeeper { } /** + * Synchronous call to create ledger. + * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} which can accept entryId. + * Parameters must match those of + * {@link #asyncCreateLedgerAdv(int, int, int, DigestType, byte[], + * AsyncCallback.CreateCallback, Object)} + * + * @param ensSize + * @param writeQuorumSize + * @param ackQuorumSize + * @param digestType + * @param passwd + * @return a handle to the newly created ledger + * @throws InterruptedException + * @throws BKException + */ + public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize, + DigestType digestType, byte passwd[]) + throws InterruptedException, BKException { + SyncCounter counter = new SyncCounter(); + counter.inc(); + /* + * Calls asynchronous version + */ + asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, + new SyncCreateCallback(), counter); + + /* + * Wait + */ + counter.block(0); + if (counter.getrc() != BKException.Code.OK) { + LOG.error("Error while creating ledger : {}", counter.getrc()); + throw BKException.create(counter.getrc()); + } else if (counter.getLh() == null) { + LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation"); + throw BKException.create(BKException.Code.UnexpectedConditionException); + } + + return counter.getLh(); + } + + /** + * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} + * which can accept entryId. Ledgers created with this call have ability to accept + * a separate write quorum and ack quorum size. The write quorum must be larger than + * the ack quorum. + * + * Separating the write and the ack quorum allows the BookKeeper client to continue + * writing when a bookie has failed but the failure has not yet been detected. Detecting + * a bookie has failed can take a number of seconds, as configured by the read timeout + * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected, + * that bookie will be removed from the ensemble. + * + * The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[], + * AsyncCallback.CreateCallback, Object)} + * + * @param ensSize + * number of bookies over which to stripe entries + * @param writeQuorumSize + * number of bookies each entry will be written to + * @param ackQuorumSize + * number of bookies which must acknowledge an entry before the call is completed + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * @param cb + * createCallback implementation + * @param ctx + * optional control object + */ + public void asyncCreateLedgerAdv(final int ensSize, + final int writeQuorumSize, + final int ackQuorumSize, + final DigestType digestType, + final byte[] passwd, final CreateCallback cb, final Object ctx) { + if (writeQuorumSize < ackQuorumSize) { + throw new IllegalArgumentException("Write quorum must be larger than ack quorum"); + } + closeLock.readLock().lock(); + try { + if (closed) { + cb.createComplete(BKException.Code.ClientClosedException, null, ctx); + return; + } + new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize, + ackQuorumSize, digestType, passwd, cb, ctx).initiateAdv(); + } finally { + closeLock.readLock().unlock(); + } + } + + /** * Open existing ledger asynchronously for reading. * * Opening a ledger with this method invokes fencing and recovery on the ledger http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index 7c181b5..6f794d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -54,6 +54,7 @@ class LedgerCreateOp implements GenericCallback<Void> { DigestType digestType; long startTime; OpStatsLogger createOpLogger; + boolean adv = false; /** * Constructor @@ -137,6 +138,14 @@ class LedgerCreateOp implements GenericCallback<Void> { } /** + * Initiates the operation to return LedgerHandleAdv. + */ + public void initiateAdv() { + this.adv = true; + initiate(); + } + + /** * Callback when created ledger. */ @Override @@ -151,7 +160,11 @@ class LedgerCreateOp implements GenericCallback<Void> { } try { - lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd); + if (adv) { + lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType, passwd); + } else { + lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd); + } } catch (GeneralSecurityException e) { LOG.error("Security exception while creating ledger: " + ledgerId, e); createComplete(BKException.Code.DigestNotInitializedException, null); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 36faac4..61cc603 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -78,7 +78,7 @@ public class LedgerHandle { final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID; final AtomicInteger blockAddCompletions = new AtomicInteger(0); - final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>(); + Queue<PendingAddOp> pendingAddOps; final Counter ensembleChangeCounter; final Counter lacUpdateHitsCounter; @@ -89,6 +89,7 @@ public class LedgerHandle { throws GeneralSecurityException, NumberFormatException { this.bk = bk; this.metadata = metadata; + this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>(); if (metadata.isClosed()) { lastAddConfirmed = lastAddPushed = metadata.getLastEntryId(); @@ -473,6 +474,23 @@ public class LedgerHandle { } /** + * Add entry synchronously to an open ledger. This can be used only with + * {@link LedgerHandleAdv} returned through ledgers created with {@link + * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * + * @param entryId + * entryId to be added + * @param data + * array of bytes to be written to the ledger + * @return the entryId of the new inserted entry + */ + public long addEntry(final long entryId, byte[] data) throws InterruptedException, BKException { + LOG.error("To use this feature Ledger must be created with createLedgerAdv interface."); + throw BKException.create(BKException.Code.IllegalOpException); + } + + /** * Add entry synchronously to an open ledger. * * @param data @@ -502,6 +520,27 @@ public class LedgerHandle { } /** + * Add entry synchronously to an open ledger. This can be used only with + * {@link LedgerHandleAdv} returned through ledgers created with {@link + * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * @param entryId + * entryId to be added. + * @param data + * array of bytes to be written to the ledger + * @param offset + * offset from which to take bytes from data + * @param length + * number of bytes to take from data + * @return entryId + */ + public long addEntry(final long entryId, byte[] data, int offset, int length) throws InterruptedException, + BKException { + LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); + throw BKException.create(BKException.Code.IllegalOpException); + } + + /** * Add entry asynchronously to an open ledger. * * @param data @@ -517,6 +556,26 @@ public class LedgerHandle { } /** + * Add entry asynchronously to an open ledger. This can be used only with + * {@link LedgerHandleAdv} returned through ledgers created with {@link + * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * @param entryId + * entryId to be added + * @param data + * array of bytes to be written + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + */ + public void asyncAddEntry(final long entryId, final byte[] data, final AddCallback cb, final Object ctx) + throws BKException { + LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); + cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx); + } + + /** * Add entry asynchronously to an open ledger, using an offset and range. * * @param data @@ -539,8 +598,35 @@ public class LedgerHandle { } /** - * Make a recovery add entry request. Recovery adds can add to a ledger even if - * it has been fenced. + * Add entry asynchronously to an open ledger, using an offset and range. + * This can be used only with {@link LedgerHandleAdv} returned through + * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * @param entryId + * entryId of the entry to add. + * @param data + * array of bytes to be written + * @param offset + * offset from which to take bytes from data + * @param length + * number of bytes to take from data + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + * @throws ArrayIndexOutOfBoundsException + * if offset or length is negative or offset and length sum to a + * value higher than the length of data. + */ + public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length, + final AddCallback cb, final Object ctx) throws BKException { + LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); + cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx); + } + + /** + * Make a recovery add entry request. Recovery adds can add to a ledger even + * if it has been fenced. * * This is only valid for bookie and ledger recovery, which may need to replicate * entries to a quorum of bookies to ensure data safety. @@ -553,13 +639,14 @@ public class LedgerHandle { doAsyncAddEntry(op, data, offset, length, cb, ctx); } - private void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, - final AddCallback cb, final Object ctx) { + void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, + final AddCallback cb, final Object ctx) { + if (offset < 0 || length < 0 || (offset + length) > data.length) { throw new ArrayIndexOutOfBoundsException( - "Invalid values for offset("+offset - +") or length("+length+")"); + "Invalid values for offset(" +offset + +") or length("+length+")"); } throttler.acquire(); @@ -592,6 +679,7 @@ public class LedgerHandle { cb.addComplete(BKException.Code.LedgerClosedException, LedgerHandle.this, INVALID_ENTRY_ID, ctx); } + @Override public String toString() { return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId); @@ -701,6 +789,7 @@ public class LedgerHandle { } ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() { AtomicBoolean completed = new AtomicBoolean(false); + @Override public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) { if (rc == BKException.Code.OK) { @@ -842,11 +931,17 @@ public class LedgerHandle { // Start from the head of the queue and proceed while there are // entries that have had all their responses come back PendingAddOp pendingAddOp; + while ((pendingAddOp = pendingAddOps.peek()) != null && blockAddCompletions.get() == 0) { if (!pendingAddOp.completed) { return; } + // Check if it is the next entry in the sequence. + if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != lastAddConfirmed + 1) { + LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId, lastAddConfirmed); + return; + } pendingAddOps.remove(); lastAddConfirmed = pendingAddOp.entryId; pendingAddOp.submitCallback(BKException.Code.OK); @@ -1219,7 +1314,7 @@ public class LedgerHandle { } } - private static class SyncAddCallback implements AddCallback { + static class SyncAddCallback implements AddCallback { long entryId = -1; /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java new file mode 100644 index 0000000..00fcfa7 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.client; + +import java.io.Serializable; +import java.security.GeneralSecurityException; +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.util.SafeRunnable; +import org.jboss.netty.buffer.ChannelBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with + * user supplied entryIds. Through this interface Ledger Length may not be accurate wile the + * ledger being written. + */ +public class LedgerHandleAdv extends LedgerHandle { + final static Logger LOG = LoggerFactory.getLogger(LedgerHandleAdv.class); + + static class PendingOpsComparator implements Comparator<PendingAddOp>, Serializable { + public int compare(PendingAddOp o1, PendingAddOp o2) { + return Long.compare(o1.entryId, o2.entryId); + } + } + + LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata, DigestType digestType, byte[] password) + throws GeneralSecurityException, NumberFormatException { + super(bk, ledgerId, metadata, digestType, password); + pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new PendingOpsComparator()); + } + + + /** + * Add entry synchronously to an open ledger. + * + * @param entryId + * entryId of the entry to add + * @param data + * array of bytes to be written to the ledger + * @return + * entryId that is just created. + */ + @Override + public long addEntry(final long entryId, byte[] data) throws InterruptedException, BKException { + + return addEntry(entryId, data, 0, data.length); + + } + + /** + * Add entry synchronously to an open ledger. + * + * @param entryId + * entryId of the entry to add + * @param data + * array of bytes to be written to the ledger + * @param offset + * offset from which to take bytes from data + * @param length + * number of bytes to take from data + * @return The entryId of newly inserted entry. + */ + @Override + public long addEntry(final long entryId, byte[] data, int offset, int length) throws InterruptedException, + BKException { + LOG.debug("Adding entry {}", data); + + SyncCounter counter = new SyncCounter(); + counter.inc(); + + SyncAddCallback callback = new SyncAddCallback(); + asyncAddEntry(entryId, data, offset, length, callback, counter); + + counter.block(0); + + if (counter.getrc() != BKException.Code.OK) { + throw BKException.create(counter.getrc()); + } + return callback.entryId; + } + + /** + * Add entry asynchronously to an open ledger. + * + * @param entryId + * entryId of the entry to add + * @param data + * array of bytes to be written + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + */ + @Override + public void asyncAddEntry(long entryId, byte[] data, AddCallback cb, Object ctx) throws BKException { + asyncAddEntry(entryId, data, 0, data.length, cb, ctx); + } + + /** + * Add entry asynchronously to an open ledger, using an offset and range. + * + * @param entryId + * entryId of the entry to add + * @param data + * array of bytes to be written + * @param offset + * offset from which to take bytes from data + * @param length + * number of bytes to take from data + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + * @throws ArrayIndexOutOfBoundsException + * if offset or length is negative or offset and length sum to a + * value higher than the length of data. + */ + + public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length, + final AddCallback cb, final Object ctx) { + PendingAddOp op = new PendingAddOp(this, cb, ctx); + op.setEntryId(entryId); + if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) { + LOG.error("Trying to re-add duplicate entryid:{}", entryId); + cb.addComplete(BKException.Code.DuplicateEntryIdException, + LedgerHandleAdv.this, entryId, ctx); + return; + } + pendingAddOps.add(op); + + doAsyncAddEntry(op, data, offset, length, cb, ctx); + } + + /** + * Overriding part is mostly around setting entryId. + * Though there may be some code duplication, Choose to have the override routine so the control flow is + * unaltered in the base class. + */ + @Override + void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, + final AddCallback cb, final Object ctx) { + if (offset < 0 || length < 0 + || (offset + length) > data.length) { + throw new ArrayIndexOutOfBoundsException( + "Invalid values for offset("+offset + +") or length("+length+")"); + } + throttler.acquire(); + + if (metadata.isClosed()) { + // make sure the callback is triggered in main worker pool + try { + bk.mainWorkerPool.submit(new SafeRunnable() { + @Override + public void safeRun() { + LOG.warn("Attempt to add to closed ledger: {}", ledgerId); + cb.addComplete(BKException.Code.LedgerClosedException, + LedgerHandleAdv.this, op.getEntryId(), ctx); + } + @Override + public String toString() { + return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId); + } + }); + } catch (RejectedExecutionException e) { + cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandleAdv.this, op.getEntryId(), ctx); + } + return; + } + + try { + final long currentLength = addToLength(length); + + bk.mainWorkerPool.submit(new SafeRunnable() { + @Override + public void safeRun() { + ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( + op.getEntryId(), lastAddConfirmed, currentLength, data, offset, length); + op.initiate(toSend, length); + } + }); + } catch (RejectedExecutionException e) { + cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandleAdv.this, op.getEntryId(), ctx); + } + } + +} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 4034c35..bc487f6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -91,6 +91,10 @@ class PendingAddOp implements WriteCallback, TimerTask { writeSet = new HashSet<Integer>(lh.distributionSchedule.getWriteSet(entryId)); } + long getEntryId() { + return this.entryId; + } + void sendWriteRequest(int bookieIndex) { int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE; @@ -249,4 +253,17 @@ class PendingAddOp implements WriteCallback, TimerTask { return sb.toString(); } + @Override + public int hashCode() { + return (int) entryId; + } + + @Override + public boolean equals(Object o) { + if (o instanceof PendingAddOp) { + return (this.entryId == ((PendingAddOp)o).entryId); + } + return (this == o); + } + } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index 7b77c48..692c480 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -131,6 +131,48 @@ public class BookieWriteLedgerTest extends } /** + * Verify the functionality of Advanced Ledger which returns + * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let + * user manage entryId allocation. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testLedgerCreateAdv() throws Exception { + // Create a ledger + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + for (int i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries1.add(entry.array()); + lh.addEntry(i, entry.array()); + } + // Start one more bookies + startNewBookie(); + + // Shutdown one bookie in the last ensemble and continue writing + ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next() + .getValue(); + killBookie(ensemble.get(0)); + + int i = numEntriesToWrite; + numEntriesToWrite = numEntriesToWrite + 50; + for (; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries1.add(entry.array()); + lh.addEntry(i, entry.array()); + } + + readEntries(lh, entries1); + lh.close(); + } + + /** * Verify asynchronous writing when few bookie failures in last ensemble. */ @Test(timeout=60000) @@ -203,8 +245,327 @@ public class BookieWriteLedgerTest extends lh2.close(); } - private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) - throws InterruptedException, BKException { + /** + * Verify Advanced asynchronous writing with entryIds in reverse order + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvWithAsyncWritesWithBookieFailures() throws Exception { + // Create ledgers + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + + LOG.info("Ledger ID-1: " + lh.getId()); + LOG.info("Ledger ID-2: " + lh2.getId()); + SyncObj syncObj1 = new SyncObj(); + SyncObj syncObj2 = new SyncObj(); + for (int i = numEntriesToWrite - 1; i >= 0; i--) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + try { + entries1.add(0, entry.array()); + entries2.add(0, entry.array()); + } catch (Exception e) { + e.printStackTrace(); + } + lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1); + lh2.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2); + } + // Start One more bookie and shutdown one from last ensemble before reading + startNewBookie(); + ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next() + .getValue(); + killBookie(ensemble.get(0)); + + // Wait for all entries to be acknowledged for the first ledger + synchronized (syncObj1) { + while (syncObj1.counter < numEntriesToWrite) { + syncObj1.wait(); + } + assertEquals(BKException.Code.OK, syncObj1.rc); + } + // Wait for all entries to be acknowledged for the second ledger + synchronized (syncObj2) { + while (syncObj2.counter < numEntriesToWrite) { + syncObj2.wait(); + } + assertEquals(BKException.Code.OK, syncObj2.rc); + } + + // Reading ledger till the last entry + readEntries(lh, entries1); + readEntries(lh2, entries2); + lh.close(); + lh2.close(); + } + + /** + * Verify Advanced asynchronous writing with entryIds in pseudo random order with bookie failures between writes + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailuresBetweenWrites() throws Exception { + // Create ledgers + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + + LOG.info("Ledger ID-1: " + lh.getId()); + LOG.info("Ledger ID-2: " + lh2.getId()); + SyncObj syncObj1 = new SyncObj(); + SyncObj syncObj2 = new SyncObj(); + int batchSize = 5; + int i, j; + + // Fill the result buffers first + for (i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + try { + entries1.add(0, entry.array()); + entries2.add(0, entry.array()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + for (i = 0; i < batchSize; i++) { + for (j = i; j < numEntriesToWrite; j = j + batchSize) { + byte[] entry1 = entries1.get(j); + byte[] entry2 = entries2.get(j); + lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1); + lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2); + if (j == numEntriesToWrite/2) { + // Start One more bookie and shutdown one from last ensemble at half-way + startNewBookie(); + ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet() + .iterator().next().getValue(); + killBookie(ensemble.get(0)); + } + } + } + + // Wait for all entries to be acknowledged for the first ledger + synchronized (syncObj1) { + while (syncObj1.counter < numEntriesToWrite) { + syncObj1.wait(); + } + assertEquals(BKException.Code.OK, syncObj1.rc); + } + // Wait for all entries to be acknowledged for the second ledger + synchronized (syncObj2) { + while (syncObj2.counter < numEntriesToWrite) { + syncObj2.wait(); + } + assertEquals(BKException.Code.OK, syncObj2.rc); + } + + // Reading ledger till the last entry + readEntries(lh, entries1); + readEntries(lh2, entries2); + lh.close(); + lh2.close(); + } + + /** + * Verify Advanced asynchronous writing with entryIds in pseudo random order + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailures() throws Exception { + // Create ledgers + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + + LOG.info("Ledger ID-1: " + lh.getId()); + LOG.info("Ledger ID-2: " + lh2.getId()); + SyncObj syncObj1 = new SyncObj(); + SyncObj syncObj2 = new SyncObj(); + int batchSize = 5; + int i, j; + + // Fill the result buffers first + for (i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + try { + entries1.add(0, entry.array()); + entries2.add(0, entry.array()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + for (i = 0; i < batchSize; i++) { + for (j = i; j < numEntriesToWrite; j = j + batchSize) { + byte[] entry1 = entries1.get(j); + byte[] entry2 = entries2.get(j); + lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1); + lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2); + } + } + // Start One more bookie and shutdown one from last ensemble before reading + startNewBookie(); + ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next() + .getValue(); + killBookie(ensemble.get(0)); + + // Wait for all entries to be acknowledged for the first ledger + synchronized (syncObj1) { + while (syncObj1.counter < numEntriesToWrite) { + syncObj1.wait(); + } + assertEquals(BKException.Code.OK, syncObj1.rc); + } + // Wait for all entries to be acknowledged for the second ledger + synchronized (syncObj2) { + while (syncObj2.counter < numEntriesToWrite) { + syncObj2.wait(); + } + assertEquals(BKException.Code.OK, syncObj2.rc); + } + + // Reading ledger till the last entry + readEntries(lh, entries1); + readEntries(lh2, entries2); + lh.close(); + lh2.close(); + } + + /** + * Skips few entries before closing the ledger and assert that the + * lastAddConfirmed is right before our skipEntryId. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvWithSkipEntries() throws Exception { + long ledgerId; + SyncObj syncObj1 = new SyncObj(); + + // Create a ledger + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + // Save ledgerId to reopen the ledger + ledgerId = lh.getId(); + LOG.info("Ledger ID: " + ledgerId); + int skipEntryId = rng.nextInt(numEntriesToWrite - 1); + for (int i = numEntriesToWrite - 1; i >= 0; i--) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + try { + entries1.add(0, entry.array()); + } catch (Exception e) { + e.printStackTrace(); + } + if (i == skipEntryId) { + LOG.info("Skipping entry:{}", skipEntryId); + continue; + } + lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1); + } + // wait for all entries to be acknowledged for the first ledger + synchronized (syncObj1) { + while (syncObj1.counter < skipEntryId) { + syncObj1.wait(); + } + assertEquals(BKException.Code.OK, syncObj1.rc); + } + // Close the ledger + lh.close(); + // Open the ledger + lh = bkc.openLedger(ledgerId, digestType, ledgerPassword); + assertEquals(lh.lastAddConfirmed, skipEntryId - 1); + lh.close(); + } + + /** + * Verify the functionality LedgerHandleAdv addEntry with duplicate entryIds + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvSyncAddDuplicateEntryIds() throws Exception { + // Create a ledger + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + LOG.info("Ledger ID: " + lh.getId()); + for (int i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries1.add(entry.array()); + lh.addEntry(i, entry.array()); + entry.position(0); + } + readEntries(lh, entries1); + + int dupEntryId = rng.nextInt(numEntriesToWrite - 1); + + try { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + lh.addEntry(dupEntryId, entry.array()); + fail("Expected exception not thrown"); + } catch (BKException e) { + // This test expects DuplicateEntryIdException + assertEquals(e.getCode(), BKException.Code.DuplicateEntryIdException); + } + lh.close(); + } + + /** + * Verify the functionality LedgerHandleAdv asyncAddEntry with duplicate + * entryIds + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvSyncAsyncAddDuplicateEntryIds() throws Exception { + long ledgerId; + SyncObj syncObj1 = new SyncObj(); + SyncObj syncObj2 = new SyncObj(); + + // Create a ledger + lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword); + // Save ledgerId to reopen the ledger + ledgerId = lh.getId(); + LOG.info("Ledger ID: " + ledgerId); + for (int i = numEntriesToWrite - 1; i >= 0; i--) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + try { + entries1.add(0, entry.array()); + } catch (Exception e) { + e.printStackTrace(); + } + lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1); + if (rng.nextBoolean()) { + // Attempt to write the same entry + lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2); + synchronized (syncObj2) { + while (syncObj2.counter < 1) { + syncObj2.wait(); + } + assertEquals(BKException.Code.DuplicateEntryIdException, syncObj2.rc); + } + } + } + // Wait for all entries to be acknowledged for the first ledger + synchronized (syncObj1) { + while (syncObj1.counter < numEntriesToWrite) { + syncObj1.wait(); + } + assertEquals(BKException.Code.OK, syncObj1.rc); + } + // Close the ledger + lh.close(); + } + + private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws InterruptedException, BKException { ls = lh.readEntries(0, numEntriesToWrite - 1); int index = 0; while (ls.hasMoreElements()) {