Author: breed Date: Fri Jul 9 20:53:36 2010 New Revision: 962693 URL: http://svn.apache.org/viewvc?rev=962693&view=rev Log: ZOOKEEPER-719. Add throttling to BookKeeper client
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul 9 20:53:36 2010 @@ -64,6 +64,8 @@ BUGFIXES: ZOOKEEPER-796. zkServer.sh should support an external PIDFILE variable (Alex Newman via phunt) + ZOOKEEPER-719. Add throttling to BookKeeper client (fpj via breed) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Fri Jul 9 20:53:36 2010 @@ -70,6 +70,8 @@ public abstract class BKException extend return new BKWriteException(); case Code.NoSuchEntryException: return new BKNoSuchEntryException(); + case Code.IncorrectParameterException: + return new BKIncorrectParameterException(); default: return new BKIllegalOpException(); } @@ -94,7 +96,8 @@ public abstract class BKException extend int LedgerClosedException = -11; int WriteException = -12; int NoSuchEntryException = -13; - + int IncorrectParameterException = -14; + int IllegalOpException = -100; } @@ -136,6 +139,8 @@ public abstract class BKException extend return "Write failed on bookie"; case Code.NoSuchEntryException: return "No such entry"; + case Code.IncorrectParameterException: + return "Incorrect parameter input"; default: return "Invalid operation"; } @@ -224,4 +229,10 @@ public abstract class BKException extend super(Code.LedgerClosedException); } } + + public static class BKIncorrectParameterException extends BKException { + public BKIncorrectParameterException() { + super(Code.IncorrectParameterException); + } + } } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java Fri Jul 9 20:53:36 2010 @@ -145,6 +145,10 @@ class LedgerCreateOp implements StringCa LOG.error("Security exception while creating ledger: " + ledgerId, e); cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx); return; + } catch (NumberFormatException e) { + LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e); + cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx); + return; } lh.writeLedgerConfig(this, null); Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jul 9 20:53:36 2010 @@ -27,6 +27,8 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Enumeration; import java.util.Queue; +import java.util.concurrent.Semaphore; + import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; @@ -61,10 +63,14 @@ public class LedgerHandle implements Rea final DigestManager macManager; final DistributionSchedule distributionSchedule; + final Semaphore opCounterSem; + private Integer throttling = 5000; + final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>(); LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, - DigestType digestType, byte[] password) throws GeneralSecurityException { + DigestType digestType, byte[] password) + throws GeneralSecurityException, NumberFormatException { this.bk = bk; this.metadata = metadata; if (metadata.isClosed()) { @@ -72,14 +78,21 @@ public class LedgerHandle implements Rea } else { lastAddConfirmed = lastAddPushed = -1; } - + this.ledgerId = ledgerId; + + String throttleValue = System.getProperty("throttle"); + if(throttleValue != null){ + this.throttling = new Integer(throttleValue); + } + this.opCounterSem = new Semaphore(throttling); + macManager = DigestManager.instantiate(ledgerId, password, digestType); this.ledgerKey = MacDigestManager.genDigest("ledger", password); distributionSchedule = new RoundRobinDistributionSchedule( metadata.quorumSize, metadata.ensembleSize); } - + /** * Get the id of the current ledger * @@ -219,7 +232,7 @@ public class LedgerHandle implements Rea * control object */ public void asyncReadEntries(long firstEntry, long lastEntry, - ReadCallback cb, Object ctx) { + ReadCallback cb, Object ctx) throws InterruptedException { // Little sanity check if (firstEntry < 0 || lastEntry > lastAddConfirmed || firstEntry > lastEntry) { @@ -228,7 +241,7 @@ public class LedgerHandle implements Rea } new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate(); - + opCounterSem.acquire(); } /** @@ -260,8 +273,8 @@ public class LedgerHandle implements Rea * some control object */ public void asyncAddEntry(final byte[] data, final AddCallback cb, - final Object ctx) { - bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { + final Object ctx) throws InterruptedException { + bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { if (metadata.isClosed()) { @@ -279,7 +292,8 @@ public class LedgerHandle implements Rea op.initiate(toSend); } - }); + }); + opCounterSem.acquire(); } // close the ledger and send fails to all the adds in the pipeline Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java Fri Jul 9 20:53:36 2010 @@ -114,6 +114,10 @@ class LedgerOpenOp implements DataCallba LOG.error("Security exception while opening ledger: " + ledgerId, e); cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx); return; + } catch (NumberFormatException e) { + LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"), e); + cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx); + return; } if (metadata.close != LedgerMetadata.NOTCLOSED) { Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Fri Jul 9 20:53:36 2010 @@ -117,9 +117,13 @@ class LedgerRecoveryOp implements ReadEn * Try to read past the last confirmed. */ private void doRecoveryRead() { - lh.lastAddConfirmed++; - lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null); - + try{ + lh.lastAddConfirmed++; + lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null); + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to read entry.", e); + Thread.currentThread().interrupt(); + } } @Override @@ -127,7 +131,12 @@ class LedgerRecoveryOp implements ReadEn // get back to prev value lh.lastAddConfirmed--; if (rc == BKException.Code.OK) { - lh.asyncAddEntry(seq.nextElement().getEntry(), this, null); + try{ + lh.asyncAddEntry(seq.nextElement().getEntry(), this, null); + } catch (InterruptedException e) { + LOG.error("Interrupted while adding entry.", e); + Thread.currentThread().interrupt(); + } return; } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java Fri Jul 9 20:53:36 2010 @@ -132,6 +132,7 @@ class PendingAddOp implements WriteCallb void submitCallback(final int rc) { cb.addComplete(rc, lh, entryId, ctx); + lh.opCounterSem.release(); } } \ No newline at end of file Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Jul 9 20:53:36 2010 @@ -87,7 +87,7 @@ class PendingReadOp implements Enumerati if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) { // we are done, the read has failed from all replicas, just fail the // read - cb.readComplete(lastErrorCode, lh, null, ctx); + submitCallback(lastErrorCode); return; } @@ -126,11 +126,15 @@ class PendingReadOp implements Enumerati entry.entryDataStream = is; if (numPendingReads == 0) { - cb.readComplete(BKException.Code.OK, lh, PendingReadOp.this, PendingReadOp.this.ctx); + submitCallback(BKException.Code.OK); } } + private void submitCallback(int code){ + cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx); + lh.opCounterSem.release(); + } public boolean hasMoreElements() { return !seq.isEmpty(); } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Jul 9 20:53:36 2010 @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.BKException; @@ -69,6 +70,7 @@ public class PerChannelBookieClient exte InetSocketAddress addr; boolean connected = false; + Semaphore opCounterSem = new Semaphore(2000); AtomicLong totalBytesOutstanding; ClientSocketChannelFactory channelFactory; OrderedSafeExecutor executor; @@ -206,6 +208,7 @@ public class PerChannelBookieClient exte Object ctx) { final int entrySize = toSend.readableBytes(); + // if (totalBytesOutstanding.get() > maxMemory) { // // TODO: how to throttle, throw an exception, or call the callback? // // Maybe this should be done at the layer above? Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=962693&r1=962692&r2=962693&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Fri Jul 9 20:53:36 2010 @@ -238,6 +238,101 @@ public class BookieReadWriteTest extends } @Test + public void testReadWriteAsyncSingleClientThrottle() throws IOException { + try { + // Create a BookKeeper client and a ledger + System.setProperty("throttle", "1000"); + bkc = new BookKeeper("127.0.0.1"); + lh = bkc.createLedger(digestType, ledgerPassword); + // bkc.initMessageDigest("SHA1"); + ledgerId = lh.getId(); + LOG.info("Ledger ID: " + lh.getId()); + + numEntriesToWrite = 20000; + for (int i = 0; i < (numEntriesToWrite - 10000); i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries.add(entry.array()); + entriesSize.add(entry.array().length); + lh.asyncAddEntry(entry.array(), this, sync); + } + + + for (int i = 0; i < 10000; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries.add(entry.array()); + entriesSize.add(entry.array().length); + lh.asyncAddEntry(entry.array(), this, sync); + } + + // wait for all entries to be acknowledged + synchronized (sync) { + while (sync.counter < numEntriesToWrite) { + LOG.debug("Entries counter = " + sync.counter); + sync.wait(); + } + } + + LOG.debug("*** WRITE COMPLETE ***"); + // close ledger + lh.close(); + + // *** WRITING PART COMPLETE // READ PART BEGINS *** + + // open ledger + lh = bkc.openLedger(ledgerId, digestType, ledgerPassword); + LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1)); + assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1)); + + // read entries + lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync); + + synchronized (sync) { + while (sync.value == false) { + sync.wait(); + } + } + + LOG.debug("*** READ COMPLETE ***"); + + // at this point, LedgerSequence ls is filled with the returned + // values + int i = 0; + while (ls.hasMoreElements()) { + ByteBuffer origbb = ByteBuffer.wrap(entries.get(i)); + Integer origEntry = origbb.getInt(); + byte[] entry = ls.nextElement().getEntry(); + ByteBuffer result = ByteBuffer.wrap(entry); + LOG.debug("Length of result: " + result.capacity()); + LOG.debug("Original entry: " + origEntry); + + Integer retrEntry = result.getInt(); + LOG.debug("Retrieved entry: " + retrEntry); + assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry)); + assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue()); + i++; + } + assertTrue("Checking number of read entries", i == numEntriesToWrite); + + lh.close(); + } catch (KeeperException e) { + LOG.error("Test failed", e); + fail("Test failed due to ZooKeeper exception"); + } catch (BKException e) { + LOG.error("Test failed", e); + fail("Test failed due to BookKeeper exception"); + } catch (InterruptedException e) { + LOG.error("Test failed", e); + fail("Test failed due to interruption"); + } + } + + @Test public void testSyncReadAsyncWriteStringsSingleClient() throws IOException { LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT"); String charset = "utf-8";