Repository: bookkeeper
Updated Branches:
  refs/heads/master 8a3922e00 -> 355ddbc7c


BOOKKEEPER-867: New Client API to allow applications pass-in EntryId. 
(Venkateswararao Jujjuri via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/355ddbc7
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/355ddbc7
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/355ddbc7

Branch: refs/heads/master
Commit: 355ddbc7c13e39c49a58371d467e87aa80f696d7
Parents: 8a3922e
Author: Sijie Guo <si...@apache.org>
Authored: Thu Oct 22 00:22:36 2015 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Oct 22 00:22:36 2015 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BKException.java   |  11 +
 .../apache/bookkeeper/client/BookKeeper.java    |  93 +++++
 .../bookkeeper/client/LedgerCreateOp.java       |  15 +-
 .../apache/bookkeeper/client/LedgerHandle.java  | 111 +++++-
 .../bookkeeper/client/LedgerHandleAdv.java      | 214 +++++++++++
 .../apache/bookkeeper/client/PendingAddOp.java  |  17 +
 .../client/BookieWriteLedgerTest.java           | 365 ++++++++++++++++++-
 7 files changed, 815 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 3991085..b2355cd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -96,6 +96,8 @@ public abstract class BKException extends Exception {
             return new BKIllegalOpException();
         case Code.AddEntryQuorumTimeoutException:
             return new BKAddEntryQuorumTimeoutException();
+        case Code.DuplicateEntryIdException:
+            return new BKDuplicateEntryIdException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -128,6 +130,7 @@ public abstract class BKException extends Exception {
         int ClientClosedException = -19;
         int LedgerExistException = -20;
         int AddEntryQuorumTimeoutException = -21;
+        int DuplicateEntryIdException = -22;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -192,6 +195,8 @@ public abstract class BKException extends Exception {
             return "Bookie protocol version on server is incompatible with 
client";
         case Code.MetadataVersionException:
             return "Bad ledger metadata version";
+        case Code.DuplicateEntryIdException:
+            return "Attempted to add Duplicate entryId";
         case Code.LedgerFencedException:
             return "Ledger has been fenced off. Some other client must have 
opened it to read";
         case Code.UnauthorizedAccessException:
@@ -261,6 +266,12 @@ public abstract class BKException extends Exception {
         }
     }
 
+    public static class BKDuplicateEntryIdException extends BKException {
+        public BKDuplicateEntryIdException() {
+            super(Code.DuplicateEntryIdException);
+        }
+    }
+
     public static class BKUnexpectedConditionException extends BKException {
         public BKUnexpectedConditionException() {
             super(Code.UnexpectedConditionException);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 6bb71fa..ed744b0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -537,6 +537,99 @@ public class BookKeeper {
     }
 
     /**
+     * Synchronous call to create ledger.
+     * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} 
which can accept entryId.
+     * Parameters must match those of
+     * {@link #asyncCreateLedgerAdv(int, int, int, DigestType, byte[],
+     *                           AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ensSize
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @param digestType
+     * @param passwd
+     * @return a handle to the newly created ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int 
ackQuorumSize,
+                                        DigestType digestType, byte passwd[])
+            throws InterruptedException, BKException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+        /*
+         * Calls asynchronous version
+         */
+        asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, 
digestType, passwd,
+                             new SyncCreateCallback(), counter);
+
+        /*
+         * Wait
+         */
+        counter.block(0);
+        if (counter.getrc() != BKException.Code.OK) {
+            LOG.error("Error while creating ledger : {}", counter.getrc());
+            throw BKException.create(counter.getrc());
+        } else if (counter.getLh() == null) {
+            LOG.error("Unexpected condition : no ledger handle returned for a 
success ledger creation");
+            throw 
BKException.create(BKException.Code.UnexpectedConditionException);
+        }
+
+        return counter.getLh();
+    }
+
+    /**
+     * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv}
+     * which can accept entryId.  Ledgers created with this call have ability 
to accept
+     * a separate write quorum and ack quorum size. The write quorum must be 
larger than
+     * the ack quorum.
+     *
+     * Separating the write and the ack quorum allows the BookKeeper client to 
continue
+     * writing when a bookie has failed but the failure has not yet been 
detected. Detecting
+     * a bookie has failed can take a number of seconds, as configured by the 
read timeout
+     * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure 
is detected,
+     * that bookie will be removed from the ensemble.
+     *
+     * The other parameters match those of {@link #asyncCreateLedger(int, int, 
DigestType, byte[],
+     *                                      AsyncCallback.CreateCallback, 
Object)}
+     *
+     * @param ensSize
+     *          number of bookies over which to stripe entries
+     * @param writeQuorumSize
+     *          number of bookies each entry will be written to
+     * @param ackQuorumSize
+     *          number of bookies which must acknowledge an entry before the 
call is completed
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param cb
+     *          createCallback implementation
+     * @param ctx
+     *          optional control object
+     */
+    public void asyncCreateLedgerAdv(final int ensSize,
+                                     final int writeQuorumSize,
+                                     final int ackQuorumSize,
+                                     final DigestType digestType,
+                                     final byte[] passwd, final CreateCallback 
cb, final Object ctx) {
+        if (writeQuorumSize < ackQuorumSize) {
+            throw new IllegalArgumentException("Write quorum must be larger 
than ack quorum");
+        }
+        closeLock.readLock().lock();
+        try {
+            if (closed) {
+                cb.createComplete(BKException.Code.ClientClosedException, 
null, ctx);
+                return;
+            }
+            new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+                               ackQuorumSize, digestType, passwd, cb, 
ctx).initiateAdv();
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
+    /**
      * Open existing ledger asynchronously for reading.
      *
      * Opening a ledger with this method invokes fencing and recovery on the 
ledger

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 7c181b5..6f794d0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -54,6 +54,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
     DigestType digestType;
     long startTime;
     OpStatsLogger createOpLogger;
+    boolean adv = false;
 
     /**
      * Constructor
@@ -137,6 +138,14 @@ class LedgerCreateOp implements GenericCallback<Void> {
     }
 
     /**
+     * Initiates the operation to return LedgerHandleAdv.
+     */
+    public void initiateAdv() {
+        this.adv = true;
+        initiate();
+    }
+
+    /**
      * Callback when created ledger.
      */
     @Override
@@ -151,7 +160,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
         }
 
         try {
-            lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+            if (adv) {
+                lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType, 
passwd);
+            } else {
+                lh = new LedgerHandle(bk, ledgerId, metadata, digestType, 
passwd);
+            }
         } catch (GeneralSecurityException e) {
             LOG.error("Security exception while creating ledger: " + ledgerId, 
e);
             createComplete(BKException.Code.DigestNotInitializedException, 
null);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 36faac4..61cc603 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -78,7 +78,7 @@ public class LedgerHandle {
     final static public long INVALID_ENTRY_ID = 
BookieProtocol.INVALID_ENTRY_ID;
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
-    final Queue<PendingAddOp> pendingAddOps = new 
ConcurrentLinkedQueue<PendingAddOp>();
+    Queue<PendingAddOp> pendingAddOps;
 
     final Counter ensembleChangeCounter;
     final Counter lacUpdateHitsCounter;
@@ -89,6 +89,7 @@ public class LedgerHandle {
             throws GeneralSecurityException, NumberFormatException {
         this.bk = bk;
         this.metadata = metadata;
+        this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
         if (metadata.isClosed()) {
             lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -473,6 +474,23 @@ public class LedgerHandle {
     }
 
     /**
+     * Add entry synchronously to an open ledger. This can be used only with
+     * {@link LedgerHandleAdv} returned through ledgers created with {@link
+     * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     *
+     * @param entryId
+     *            entryId to be added
+     * @param data
+     *            array of bytes to be written to the ledger
+     * @return the entryId of the new inserted entry
+     */
+    public long addEntry(final long entryId, byte[] data) throws 
InterruptedException, BKException {
+        LOG.error("To use this feature Ledger must be created with 
createLedgerAdv interface.");
+        throw BKException.create(BKException.Code.IllegalOpException);
+    }
+
+    /**
      * Add entry synchronously to an open ledger.
      *
      * @param data
@@ -502,6 +520,27 @@ public class LedgerHandle {
     }
 
     /**
+     * Add entry synchronously to an open ledger. This can be used only with
+     * {@link LedgerHandleAdv} returned through ledgers created with {@link
+     * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId to be added.
+     * @param data
+     *            array of bytes to be written to the ledger
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @return entryId
+     */
+    public long addEntry(final long entryId, byte[] data, int offset, int 
length) throws InterruptedException,
+            BKException {
+        LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
+        throw BKException.create(BKException.Code.IllegalOpException);
+    }
+
+    /**
      * Add entry asynchronously to an open ledger.
      *
      * @param data
@@ -517,6 +556,26 @@ public class LedgerHandle {
     }
 
     /**
+     * Add entry asynchronously to an open ledger. This can be used only with
+     * {@link LedgerHandleAdv} returned through ledgers created with {@link
+     * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId to be added
+     * @param data
+     *            array of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    public void asyncAddEntry(final long entryId, final byte[] data, final 
AddCallback cb, final Object ctx)
+            throws BKException {
+        LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
+        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, 
entryId, ctx);
+    }
+
+    /**
      * Add entry asynchronously to an open ledger, using an offset and range.
      *
      * @param data
@@ -539,8 +598,35 @@ public class LedgerHandle {
     }
 
     /**
-     * Make a recovery add entry request. Recovery adds can add to a ledger 
even if
-     * it has been fenced.
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * This can be used only with {@link LedgerHandleAdv} returned through
+     * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int, 
DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId of the entry to add.
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to 
a
+     *             value higher than the length of data.
+     */
+    public void asyncAddEntry(final long entryId, final byte[] data, final int 
offset, final int length,
+            final AddCallback cb, final Object ctx) throws BKException {
+        LOG.error("To use this feature Ledger must be created with 
createLedgerAdv() interface.");
+        cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, 
entryId, ctx);
+    }
+
+    /**
+     * Make a recovery add entry request. Recovery adds can add to a ledger 
even
+     * if it has been fenced.
      *
      * This is only valid for bookie and ledger recovery, which may need to 
replicate
      * entries to a quorum of bookies to ensure data safety.
@@ -553,13 +639,14 @@ public class LedgerHandle {
         doAsyncAddEntry(op, data, offset, length, cb, ctx);
     }
 
-    private void doAsyncAddEntry(final PendingAddOp op, final byte[] data, 
final int offset, final int length,
-                                 final AddCallback cb, final Object ctx) {
+    void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int 
offset, final int length,
+                         final AddCallback cb, final Object ctx) {
+
         if (offset < 0 || length < 0
                 || (offset + length) > data.length) {
             throw new ArrayIndexOutOfBoundsException(
-                "Invalid values for offset("+offset
-                +") or length("+length+")");
+                    "Invalid values for offset(" +offset
+                    +") or length("+length+")");
         }
         throttler.acquire();
 
@@ -592,6 +679,7 @@ public class LedgerHandle {
                         cb.addComplete(BKException.Code.LedgerClosedException,
                                 LedgerHandle.this, INVALID_ENTRY_ID, ctx);
                     }
+
                     @Override
                     public String toString() {
                         return 
String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
@@ -701,6 +789,7 @@ public class LedgerHandle {
         }
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new 
ReadLastConfirmedOp.LastConfirmedDataCallback() {
             AtomicBoolean completed = new AtomicBoolean(false);
+
             @Override
             public void readLastConfirmedDataComplete(int rc, 
DigestManager.RecoveryData data) {
                 if (rc == BKException.Code.OK) {
@@ -842,11 +931,17 @@ public class LedgerHandle {
         // Start from the head of the queue and proceed while there are
         // entries that have had all their responses come back
         PendingAddOp pendingAddOp;
+
         while ((pendingAddOp = pendingAddOps.peek()) != null
                && blockAddCompletions.get() == 0) {
             if (!pendingAddOp.completed) {
                 return;
             }
+            // Check if it is the next entry in the sequence.
+            if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != 
lastAddConfirmed + 1) {
+                LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", 
pendingAddOp.entryId, lastAddConfirmed);
+                return;
+            }
             pendingAddOps.remove();
             lastAddConfirmed = pendingAddOp.entryId;
             pendingAddOp.submitCallback(BKException.Code.OK);
@@ -1219,7 +1314,7 @@ public class LedgerHandle {
         }
     }
 
-    private static class SyncAddCallback implements AddCallback {
+    static class SyncAddCallback implements AddCallback {
         long entryId = -1;
 
         /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
new file mode 100644
index 0000000..00fcfa7
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.Serializable;
+import java.security.GeneralSecurityException;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add 
entries with
+ * user supplied entryIds. Through this interface Ledger Length may not be 
accurate wile the
+ * ledger being written.
+ */
+public class LedgerHandleAdv extends LedgerHandle {
+    final static Logger LOG = LoggerFactory.getLogger(LedgerHandleAdv.class);
+
+    static class PendingOpsComparator implements Comparator<PendingAddOp>, 
Serializable {
+        public int compare(PendingAddOp o1, PendingAddOp o2) {
+            return Long.compare(o1.entryId, o2.entryId);
+        }
+    }
+
+    LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata, 
DigestType digestType, byte[] password)
+            throws GeneralSecurityException, NumberFormatException {
+        super(bk, ledgerId, metadata, digestType, password);
+        pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new 
PendingOpsComparator());
+    }
+
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written to the ledger
+     * @return
+     *            entryId that is just created.
+     */
+    @Override
+    public long addEntry(final long entryId, byte[] data) throws 
InterruptedException, BKException {
+
+        return addEntry(entryId, data, 0, data.length);
+
+    }
+
+    /**
+     * Add entry synchronously to an open ledger.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written to the ledger
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @return The entryId of newly inserted entry.
+     */
+    @Override
+    public long addEntry(final long entryId, byte[] data, int offset, int 
length) throws InterruptedException,
+            BKException {
+        LOG.debug("Adding entry {}", data);
+
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+
+        SyncAddCallback callback = new SyncAddCallback();
+        asyncAddEntry(entryId, data, offset, length, callback, counter);
+
+        counter.block(0);
+
+        if (counter.getrc() != BKException.Code.OK) {
+            throw BKException.create(counter.getrc());
+        }
+        return callback.entryId;
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    @Override
+    public void asyncAddEntry(long entryId, byte[] data, AddCallback cb, 
Object ctx) throws BKException {
+        asyncAddEntry(entryId, data, 0, data.length, cb, ctx);
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     *
+     * @param entryId
+     *            entryId of the entry to add
+     * @param data
+     *            array of bytes to be written
+     * @param offset
+     *            offset from which to take bytes from data
+     * @param length
+     *            number of bytes to take from data
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     * @throws ArrayIndexOutOfBoundsException
+     *             if offset or length is negative or offset and length sum to 
a
+     *             value higher than the length of data.
+     */
+
+    public void asyncAddEntry(final long entryId, final byte[] data, final int 
offset, final int length,
+            final AddCallback cb, final Object ctx) {
+        PendingAddOp op = new PendingAddOp(this, cb, ctx);
+        op.setEntryId(entryId);
+        if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
+            LOG.error("Trying to re-add duplicate entryid:{}", entryId);
+            cb.addComplete(BKException.Code.DuplicateEntryIdException,
+                    LedgerHandleAdv.this, entryId, ctx);
+            return;
+        }
+        pendingAddOps.add(op);
+
+        doAsyncAddEntry(op, data, offset, length, cb, ctx);
+    }
+
+    /**
+     * Overriding part is mostly around setting entryId.
+     * Though there may be some code duplication, Choose to have the override 
routine so the control flow is
+     * unaltered in the base class.
+     */
+    @Override
+    void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int 
offset, final int length,
+            final AddCallback cb, final Object ctx) {
+        if (offset < 0 || length < 0
+                || (offset + length) > data.length) {
+            throw new ArrayIndexOutOfBoundsException(
+                "Invalid values for offset("+offset
+                +") or length("+length+")");
+        }
+        throttler.acquire();
+
+        if (metadata.isClosed()) {
+            // make sure the callback is triggered in main worker pool
+            try {
+                bk.mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        LOG.warn("Attempt to add to closed ledger: {}", 
ledgerId);
+                        cb.addComplete(BKException.Code.LedgerClosedException,
+                                LedgerHandleAdv.this, op.getEntryId(), ctx);
+                    }
+                    @Override
+                    public String toString() {
+                        return 
String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandleAdv.this, op.getEntryId(), ctx);
+            }
+            return;
+        }
+
+        try {
+            final long currentLength = addToLength(length);
+
+            bk.mainWorkerPool.submit(new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    ChannelBuffer toSend = 
macManager.computeDigestAndPackageForSending(
+                                               op.getEntryId(), 
lastAddConfirmed, currentLength, data, offset, length);
+                    op.initiate(toSend, length);
+                }
+            });
+        } catch (RejectedExecutionException e) {
+            
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                    LedgerHandleAdv.this, op.getEntryId(), ctx);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 4034c35..bc487f6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -91,6 +91,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
         writeSet = new 
HashSet<Integer>(lh.distributionSchedule.getWriteSet(entryId));
     }
 
+    long getEntryId() {
+        return this.entryId;
+    }
+
     void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : 
BookieProtocol.FLAG_NONE;
 
@@ -249,4 +253,17 @@ class PendingAddOp implements WriteCallback, TimerTask {
         return sb.toString();
     }
 
+    @Override
+    public int hashCode() {
+        return (int) entryId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+       if (o instanceof PendingAddOp) {
+           return (this.entryId == ((PendingAddOp)o).entryId);
+       }
+       return (this == o);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 7b77c48..692c480 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -131,6 +131,48 @@ public class BookieWriteLedgerTest extends
     }
 
     /**
+     * Verify the functionality of Advanced Ledger which returns
+     * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
+     * user manage entryId allocation.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdv() throws Exception {
+        // Create a ledger
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(i, entry.array());
+        }
+        // Start one more bookies
+        startNewBookie();
+
+        // Shutdown one bookie in the last ensemble and continue writing
+        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+                .getValue();
+        killBookie(ensemble.get(0));
+
+        int i = numEntriesToWrite;
+        numEntriesToWrite = numEntriesToWrite + 50;
+        for (; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(i, entry.array());
+        }
+
+        readEntries(lh, entries1);
+        lh.close();
+    }
+
+    /**
      * Verify asynchronous writing when few bookie failures in last ensemble.
      */
     @Test(timeout=60000)
@@ -203,8 +245,327 @@ public class BookieWriteLedgerTest extends
         lh2.close();
     }
 
-    private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries)
-            throws InterruptedException, BKException {
+    /**
+     * Verify Advanced asynchronous writing with entryIds in reverse order
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvWithAsyncWritesWithBookieFailures() throws 
Exception {
+        // Create ledgers
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+        LOG.info("Ledger ID-1: " + lh.getId());
+        LOG.info("Ledger ID-2: " + lh2.getId());
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+        for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            try {
+                entries1.add(0, entry.array());
+                entries2.add(0, entry.array());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, 
syncObj1);
+            lh2.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, 
syncObj2);
+        }
+        // Start One more bookie and shutdown one from last ensemble before 
reading
+        startNewBookie();
+        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+                .getValue();
+        killBookie(ensemble.get(0));
+
+        // Wait for all entries to be acknowledged for the first ledger
+        synchronized (syncObj1) {
+            while (syncObj1.counter < numEntriesToWrite) {
+                syncObj1.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
+        }
+        // Wait for all entries to be acknowledged for the second ledger
+        synchronized (syncObj2) {
+            while (syncObj2.counter < numEntriesToWrite) {
+                syncObj2.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj2.rc);
+        }
+
+        // Reading ledger till the last entry
+        readEntries(lh, entries1);
+        readEntries(lh2, entries2);
+        lh.close();
+        lh2.close();
+    }
+
+    /**
+     * Verify Advanced asynchronous writing with entryIds in pseudo random 
order with bookie failures between writes
+     */
+    @Test(timeout = 60000)
+    public void 
testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailuresBetweenWrites() 
throws Exception {
+        // Create ledgers
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+        LOG.info("Ledger ID-1: " + lh.getId());
+        LOG.info("Ledger ID-2: " + lh2.getId());
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+        int batchSize = 5;
+        int i, j;
+
+        // Fill the result buffers first
+        for (i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            try {
+                entries1.add(0, entry.array());
+                entries2.add(0, entry.array());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        for (i = 0; i < batchSize; i++) {
+            for (j = i; j < numEntriesToWrite; j = j + batchSize) {
+                byte[] entry1 = entries1.get(j);
+                byte[] entry2 = entries2.get(j);
+                lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
+                lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
+                if (j == numEntriesToWrite/2) {
+                    // Start One more bookie and shutdown one from last 
ensemble at half-way
+                    startNewBookie();
+                    ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet()
+                            .iterator().next().getValue();
+                    killBookie(ensemble.get(0));
+                }
+            }
+        }
+
+        // Wait for all entries to be acknowledged for the first ledger
+        synchronized (syncObj1) {
+            while (syncObj1.counter < numEntriesToWrite) {
+                syncObj1.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
+        }
+        // Wait for all entries to be acknowledged for the second ledger
+        synchronized (syncObj2) {
+            while (syncObj2.counter < numEntriesToWrite) {
+                syncObj2.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj2.rc);
+        }
+
+        // Reading ledger till the last entry
+        readEntries(lh, entries1);
+        readEntries(lh2, entries2);
+        lh.close();
+        lh2.close();
+    }
+
+    /**
+     * Verify Advanced asynchronous writing with entryIds in pseudo random 
order
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailures() 
throws Exception {
+        // Create ledgers
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+        LOG.info("Ledger ID-1: " + lh.getId());
+        LOG.info("Ledger ID-2: " + lh2.getId());
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+        int batchSize = 5;
+        int i, j;
+
+        // Fill the result buffers first
+        for (i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            try {
+                entries1.add(0, entry.array());
+                entries2.add(0, entry.array());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        for (i = 0; i < batchSize; i++) {
+            for (j = i; j < numEntriesToWrite; j = j + batchSize) {
+                byte[] entry1 = entries1.get(j);
+                byte[] entry2 = entries2.get(j);
+                lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
+                lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
+            }
+        }
+        // Start One more bookie and shutdown one from last ensemble before 
reading
+        startNewBookie();
+        ArrayList<BookieSocketAddress> ensemble = 
lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+                .getValue();
+        killBookie(ensemble.get(0));
+
+        // Wait for all entries to be acknowledged for the first ledger
+        synchronized (syncObj1) {
+            while (syncObj1.counter < numEntriesToWrite) {
+                syncObj1.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
+        }
+        // Wait for all entries to be acknowledged for the second ledger
+        synchronized (syncObj2) {
+            while (syncObj2.counter < numEntriesToWrite) {
+                syncObj2.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj2.rc);
+        }
+
+        // Reading ledger till the last entry
+        readEntries(lh, entries1);
+        readEntries(lh2, entries2);
+        lh.close();
+        lh2.close();
+    }
+
+    /**
+     * Skips few entries before closing the ledger and assert that the
+     * lastAddConfirmed is right before our skipEntryId.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvWithSkipEntries() throws Exception {
+        long ledgerId;
+        SyncObj syncObj1 = new SyncObj();
+
+        // Create a ledger
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        // Save ledgerId to reopen the ledger
+        ledgerId = lh.getId();
+        LOG.info("Ledger ID: " + ledgerId);
+        int skipEntryId = rng.nextInt(numEntriesToWrite - 1);
+        for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            try {
+                entries1.add(0, entry.array());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            if (i == skipEntryId) {
+                LOG.info("Skipping entry:{}", skipEntryId);
+                continue;
+            }
+            lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, 
syncObj1);
+        }
+        // wait for all entries to be acknowledged for the first ledger
+        synchronized (syncObj1) {
+            while (syncObj1.counter < skipEntryId) {
+                syncObj1.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
+        }
+        // Close the ledger
+        lh.close();
+        // Open the ledger
+        lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+        assertEquals(lh.lastAddConfirmed, skipEntryId - 1);
+        lh.close();
+    }
+
+    /**
+     * Verify the functionality LedgerHandleAdv addEntry with duplicate 
entryIds
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvSyncAddDuplicateEntryIds() throws Exception 
{
+        // Create a ledger
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        LOG.info("Ledger ID: " + lh.getId());
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+
+            entries1.add(entry.array());
+            lh.addEntry(i, entry.array());
+            entry.position(0);
+        }
+        readEntries(lh, entries1);
+
+        int dupEntryId = rng.nextInt(numEntriesToWrite - 1);
+
+        try {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            lh.addEntry(dupEntryId, entry.array());
+            fail("Expected exception not thrown");
+        } catch (BKException e) {
+            // This test expects DuplicateEntryIdException
+            assertEquals(e.getCode(), 
BKException.Code.DuplicateEntryIdException);
+        }
+        lh.close();
+    }
+
+    /**
+     * Verify the functionality LedgerHandleAdv asyncAddEntry with duplicate
+     * entryIds
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testLedgerCreateAdvSyncAsyncAddDuplicateEntryIds() throws 
Exception {
+        long ledgerId;
+        SyncObj syncObj1 = new SyncObj();
+        SyncObj syncObj2 = new SyncObj();
+
+        // Create a ledger
+        lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+        // Save ledgerId to reopen the ledger
+        ledgerId = lh.getId();
+        LOG.info("Ledger ID: " + ledgerId);
+        for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(maxInt));
+            entry.position(0);
+            try {
+                entries1.add(0, entry.array());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, 
syncObj1);
+            if (rng.nextBoolean()) {
+                // Attempt to write the same entry
+                lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, 
syncObj2);
+                synchronized (syncObj2) {
+                    while (syncObj2.counter < 1) {
+                        syncObj2.wait();
+                    }
+                    assertEquals(BKException.Code.DuplicateEntryIdException, 
syncObj2.rc);
+                }
+            }
+        }
+        // Wait for all entries to be acknowledged for the first ledger
+        synchronized (syncObj1) {
+            while (syncObj1.counter < numEntriesToWrite) {
+                syncObj1.wait();
+            }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
+        }
+        // Close the ledger
+        lh.close();
+    }
+
+    private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) 
throws InterruptedException, BKException {
         ls = lh.readEntries(0, numEntriesToWrite - 1);
         int index = 0;
         while (ls.hasMoreElements()) {

Reply via email to