Author: breed Date: Fri Sep 17 16:58:07 2010 New Revision: 998200 URL: http://svn.apache.org/viewvc?rev=998200&view=rev Log: ZOOKEEPER-831. BookKeeper: Throttling improved for reads
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=998200&r1=998199&r2=998200&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Sep 17 16:58:07 2010 @@ -96,6 +96,8 @@ BUGFIXES: ZOOKEEPER-870. Zookeeper trunk build broken. (mahadev via phunt) + ZOOKEEPER-831. BookKeeper: Throttling improved for reads (breed via fpj) + IMPROVEMENTS: ZOOKEEPER-724. Improve junit test integration - log harness information (phunt via mahadev) Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=998200&r1=998199&r2=998200&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Sep 17 16:58:07 2010 @@ -67,7 +67,7 @@ public class LedgerHandle implements Rea private Integer throttling = 5000; final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>(); - + LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, DigestType digestType, byte[] password) throws GeneralSecurityException, NumberFormatException { @@ -149,6 +149,15 @@ public class LedgerHandle implements Rea } /** + * Return total number of available slots. + * + * @return int available slots + */ + Semaphore getAvailablePermits(){ + return this.opCounterSem; + } + + /** * Get the Distribution Schedule * * @return DistributionSchedule for the LedgerHandle @@ -277,7 +286,6 @@ public class LedgerHandle implements Rea } new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate(); - opCounterSem.acquire(); } /** @@ -310,26 +318,32 @@ public class LedgerHandle implements Rea */ public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) throws InterruptedException { - bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - if (metadata.isClosed()) { - LOG.warn("Attempt to add to closed ledger: " + ledgerId); - cb.addComplete(BKException.Code.LedgerClosedException, - LedgerHandle.this, -1, ctx); - return; - } - - long entryId = ++lastAddPushed; - PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId); - pendingAddOps.add(op); - ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( - entryId, lastAddConfirmed, data); - op.initiate(toSend); - - } - }); opCounterSem.acquire(); + + try{ + bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { + @Override + public void safeRun() { + if (metadata.isClosed()) { + LOG.warn("Attempt to add to closed ledger: " + ledgerId); + LedgerHandle.this.opCounterSem.release(); + cb.addComplete(BKException.Code.LedgerClosedException, + LedgerHandle.this, -1, ctx); + return; + } + + long entryId = ++lastAddPushed; + PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId); + pendingAddOps.add(op); + ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( + entryId, lastAddConfirmed, data); + op.initiate(toSend); + } + }); + } catch (RuntimeException e) { + opCounterSem.release(); + throw e; + } } // close the ledger and send fails to all the adds in the pipeline Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=998200&r1=998199&r2=998200&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Sep 17 16:58:07 2010 @@ -64,12 +64,18 @@ class PendingReadOp implements Enumerati numPendingReads = endEntryId - startEntryId + 1; } - public void initiate() { + public void initiate() throws InterruptedException { long nextEnsembleChange = startEntryId, i = startEntryId; ArrayList<InetSocketAddress> ensemble = null; do { + if(LOG.isDebugEnabled()){ + LOG.debug("Acquiring lock: " + i); + } + + lh.opCounterSem.acquire(); + if (i == nextEnsembleChange) { ensemble = lh.metadata.getEnsemble(i); nextEnsembleChange = lh.metadata.getNextEnsembleChange(i); @@ -80,7 +86,6 @@ class PendingReadOp implements Enumerati sendRead(ensemble, entry, BKException.Code.ReadException); } while (i <= endEntryId); - } void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) { @@ -114,7 +119,6 @@ class PendingReadOp implements Enumerati return; } - numPendingReads--; ChannelBufferInputStream is; try { is = lh.macManager.verifyDigestAndReturnData(entryId, buffer); @@ -125,15 +129,23 @@ class PendingReadOp implements Enumerati entry.entryDataStream = is; + numPendingReads--; if (numPendingReads == 0) { submitCallback(BKException.Code.OK); } - + + if(LOG.isDebugEnabled()){ + LOG.debug("Releasing lock: " + entryId); + } + + lh.opCounterSem.release(); + + if(numPendingReads < 0) + LOG.error("Read too many values"); } private void submitCallback(int code){ cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx); - lh.opCounterSem.release(); } public boolean hasMoreElements() { return !seq.isEmpty(); Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=998200&r1=998199&r2=998200&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Fri Sep 17 16:58:07 2010 @@ -23,12 +23,17 @@ package org.apache.bookkeeper.test; import java.io.File; import java.io.IOException; +import java.lang.NoSuchFieldException; +import java.lang.IllegalAccessException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Enumeration; import java.util.Random; import java.util.Set; +import java.util.concurrent.Semaphore; + import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; @@ -83,7 +88,7 @@ public class BookieReadWriteTest extends Set<Object> syncObjs; class SyncObj { - int counter; + volatile int counter; boolean value; public SyncObj() { @@ -237,19 +242,61 @@ public class BookieReadWriteTest extends } } + class ThrottleTestCallback implements ReadCallback { + int throttle; + + ThrottleTestCallback(int threshold){ + this.throttle = threshold; + } + + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx){ + if(rc != BKException.Code.OK){ + fail("Return code is not OK: " + rc); + } + + ls = seq; + synchronized(sync){ + sync.counter += throttle; + sync.notify(); + } + LOG.info("Current counter: " + sync.counter); + } + } + + /** + * Method for obtaining the available permits of a ledger handle + * using reflection to avoid adding a new public method to the + * class. + * + * @param lh + * @return + */ + @SuppressWarnings("unchecked") + int getAvailablePermits(LedgerHandle lh) throws + NoSuchFieldException, IllegalAccessException + { + Field field = LedgerHandle.class.getDeclaredField("opCounterSem"); + field.setAccessible(true); + return ((Semaphore)field.get(lh)).availablePermits(); + } + @Test - public void testReadWriteAsyncSingleClientThrottle() throws IOException { + public void testReadWriteAsyncSingleClientThrottle() throws + IOException, NoSuchFieldException, IllegalAccessException { try { + + Integer throttle = 100; + ThrottleTestCallback tcb = new ThrottleTestCallback(throttle); // Create a BookKeeper client and a ledger - System.setProperty("throttle", "1000"); + System.setProperty("throttle", throttle.toString()); bkc = new BookKeeper("127.0.0.1"); lh = bkc.createLedger(digestType, ledgerPassword); // bkc.initMessageDigest("SHA1"); ledgerId = lh.getId(); LOG.info("Ledger ID: " + lh.getId()); - numEntriesToWrite = 20000; - for (int i = 0; i < (numEntriesToWrite - 10000); i++) { + numEntriesToWrite = 8000; + for (int i = 0; i < (numEntriesToWrite - 2000); i++) { ByteBuffer entry = ByteBuffer.allocate(4); entry.putInt(rng.nextInt(maxInt)); entry.position(0); @@ -257,10 +304,15 @@ public class BookieReadWriteTest extends entries.add(entry.array()); entriesSize.add(entry.array().length); lh.asyncAddEntry(entry.array(), this, sync); + /* + * Check that the difference is no larger than the throttling threshold + */ + int testValue = getAvailablePermits(lh); + assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle); } - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 2000; i++) { ByteBuffer entry = ByteBuffer.allocate(4); entry.putInt(rng.nextInt(maxInt)); entry.position(0); @@ -268,6 +320,12 @@ public class BookieReadWriteTest extends entries.add(entry.array()); entriesSize.add(entry.array().length); lh.asyncAddEntry(entry.array(), this, sync); + + /* + * Check that the difference is no larger than the throttling threshold + */ + int testValue = getAvailablePermits(lh); + assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle); } // wait for all entries to be acknowledged @@ -290,35 +348,22 @@ public class BookieReadWriteTest extends assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1)); // read entries - lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync); - + sync.counter = 0; + for (int i = 0; i < numEntriesToWrite; i+=throttle) { + lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync); + int testValue = getAvailablePermits(lh); + assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle); + } + synchronized (sync) { - while (sync.value == false) { + while (sync.counter < numEntriesToWrite) { + LOG.info("Entries counter = " + sync.counter); sync.wait(); } } LOG.debug("*** READ COMPLETE ***"); - // at this point, LedgerSequence ls is filled with the returned - // values - int i = 0; - while (ls.hasMoreElements()) { - ByteBuffer origbb = ByteBuffer.wrap(entries.get(i)); - Integer origEntry = origbb.getInt(); - byte[] entry = ls.nextElement().getEntry(); - ByteBuffer result = ByteBuffer.wrap(entry); - LOG.debug("Length of result: " + result.capacity()); - LOG.debug("Original entry: " + origEntry); - - Integer retrEntry = result.getInt(); - LOG.debug("Retrieved entry: " + retrEntry); - assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry)); - assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue()); - i++; - } - assertTrue("Checking number of read entries", i == numEntriesToWrite); - lh.close(); } catch (KeeperException e) { LOG.error("Test failed", e); @@ -565,7 +610,10 @@ public class BookieReadWriteTest extends } public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc); + SyncObj x = (SyncObj) ctx; + synchronized (x) { x.counter++; x.notify(); @@ -573,12 +621,14 @@ public class BookieReadWriteTest extends } public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { + if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc); + ls = seq; + synchronized (sync) { sync.value = true; sync.notify(); } - } @Before