Author: breed
Date: Fri Sep 17 16:58:07 2010
New Revision: 998200

URL: http://svn.apache.org/viewvc?rev=998200&view=rev
Log:
ZOOKEEPER-831. BookKeeper: Throttling improved for reads

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Sep 17 16:58:07 2010
@@ -96,6 +96,8 @@ BUGFIXES: 
 
   ZOOKEEPER-870. Zookeeper trunk build broken. (mahadev via phunt)
 
+  ZOOKEEPER-831. BookKeeper: Throttling improved for reads (breed via fpj)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
 Fri Sep 17 16:58:07 2010
@@ -67,7 +67,7 @@ public class LedgerHandle implements Rea
   private Integer throttling = 5000;
   
   final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
-
+  
   LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
       DigestType digestType, byte[] password)
       throws GeneralSecurityException, NumberFormatException {
@@ -149,6 +149,15 @@ public class LedgerHandle implements Rea
   }
   
   /**
+   * Return total number of available slots.
+   * 
+   * @return int    available slots
+   */
+  Semaphore getAvailablePermits(){
+      return this.opCounterSem;
+  }
+  
+  /**
    * Get the Distribution Schedule
    * 
    * @return DistributionSchedule for the LedgerHandle
@@ -277,7 +286,6 @@ public class LedgerHandle implements Rea
     }
 
     new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-    opCounterSem.acquire();
   }
 
   /**
@@ -310,26 +318,32 @@ public class LedgerHandle implements Rea
    */
   public void asyncAddEntry(final byte[] data, final AddCallback cb,
       final Object ctx) throws InterruptedException {
-      bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-      @Override
-      public void safeRun() {
-        if (metadata.isClosed()) {
-          LOG.warn("Attempt to add to closed ledger: " + ledgerId);
-          cb.addComplete(BKException.Code.LedgerClosedException,
-              LedgerHandle.this, -1, ctx);
-          return;
-        }
-
-        long entryId = ++lastAddPushed;
-        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, 
entryId);
-        pendingAddOps.add(op);
-        ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
-            entryId, lastAddConfirmed, data);
-        op.initiate(toSend);
-
-      }
-      });
       opCounterSem.acquire();
+      
+      try{
+          bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+              @Override
+              public void safeRun() {
+                  if (metadata.isClosed()) {
+                      LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+                      LedgerHandle.this.opCounterSem.release();
+                      cb.addComplete(BKException.Code.LedgerClosedException,
+                              LedgerHandle.this, -1, ctx);
+                      return;
+                  }
+
+                  long entryId = ++lastAddPushed;
+                  PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, 
ctx, entryId);
+                  pendingAddOps.add(op);
+                  ChannelBuffer toSend = 
macManager.computeDigestAndPackageForSending(
+                          entryId, lastAddConfirmed, data);
+                  op.initiate(toSend);
+              }
+          });
+      } catch (RuntimeException e) {
+          opCounterSem.release();
+          throw e;
+      }
   }
 
   // close the ledger and send fails to all the adds in the pipeline

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
 Fri Sep 17 16:58:07 2010
@@ -64,12 +64,18 @@ class PendingReadOp implements Enumerati
         numPendingReads = endEntryId - startEntryId + 1;
     }
 
-    public void initiate() {
+    public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
 
         ArrayList<InetSocketAddress> ensemble = null;
         do {
 
+            if(LOG.isDebugEnabled()){
+                LOG.debug("Acquiring lock: " + i);
+            }
+           
+            lh.opCounterSem.acquire();
+            
             if (i == nextEnsembleChange) {
                 ensemble = lh.metadata.getEnsemble(i);
                 nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
@@ -80,7 +86,6 @@ class PendingReadOp implements Enumerati
             sendRead(ensemble, entry, BKException.Code.ReadException);
 
         } while (i <= endEntryId);
-
     }
 
     void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, 
int lastErrorCode) {
@@ -114,7 +119,6 @@ class PendingReadOp implements Enumerati
             return;
         }
         
-        numPendingReads--;
         ChannelBufferInputStream is;
         try {
             is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
@@ -125,15 +129,23 @@ class PendingReadOp implements Enumerati
 
         entry.entryDataStream = is;
 
+        numPendingReads--;
         if (numPendingReads == 0) {
             submitCallback(BKException.Code.OK);
         }
-
+        
+        if(LOG.isDebugEnabled()){
+            LOG.debug("Releasing lock: " + entryId);
+        }
+        
+        lh.opCounterSem.release();
+        
+        if(numPendingReads < 0)
+            LOG.error("Read too many values");
     }
 
     private void submitCallback(int code){
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
-        lh.opCounterSem.release();
     }
     public boolean hasMoreElements() {
         return !seq.isEmpty();

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
 Fri Sep 17 16:58:07 2010
@@ -23,12 +23,17 @@ package org.apache.bookkeeper.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.NoSuchFieldException;
+import java.lang.IllegalAccessException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
+
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -83,7 +88,7 @@ public class BookieReadWriteTest extends
     Set<Object> syncObjs;
 
     class SyncObj {
-        int counter;
+        volatile int counter;
         boolean value;
 
         public SyncObj() {
@@ -237,19 +242,61 @@ public class BookieReadWriteTest extends
         }
     }
 
+    class ThrottleTestCallback implements ReadCallback {
+        int throttle;
+        
+        ThrottleTestCallback(int threshold){
+            this.throttle = threshold;
+        }
+        
+        public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx){
+            if(rc != BKException.Code.OK){
+                fail("Return code is not OK: " + rc);
+            }
+        
+            ls = seq;
+            synchronized(sync){
+                sync.counter += throttle;
+                sync.notify();
+            }
+            LOG.info("Current counter: " + sync.counter);
+        }
+    }
+    
+    /**
+     * Method for obtaining the available permits of a ledger handle
+     * using reflection to avoid adding a new public method to the
+     * class.
+     *   
+     * @param lh
+     * @return
+     */
+    @SuppressWarnings("unchecked")
+    int getAvailablePermits(LedgerHandle lh) throws
+    NoSuchFieldException, IllegalAccessException
+    { 
+        Field field = LedgerHandle.class.getDeclaredField("opCounterSem"); 
+        field.setAccessible(true); 
+        return ((Semaphore)field.get(lh)).availablePermits(); 
+    }
+    
     @Test
-    public void testReadWriteAsyncSingleClientThrottle() throws IOException {
+    public void testReadWriteAsyncSingleClientThrottle() throws 
+    IOException, NoSuchFieldException, IllegalAccessException {
         try {
+                       
+            Integer throttle = 100;
+            ThrottleTestCallback tcb = new ThrottleTestCallback(throttle);
             // Create a BookKeeper client and a ledger
-            System.setProperty("throttle", "1000");
+            System.setProperty("throttle", throttle.toString());
             bkc = new BookKeeper("127.0.0.1");
             lh = bkc.createLedger(digestType, ledgerPassword);
             // bkc.initMessageDigest("SHA1");
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
             
-            numEntriesToWrite = 20000; 
-            for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
+            numEntriesToWrite = 8000; 
+            for (int i = 0; i < (numEntriesToWrite - 2000); i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
@@ -257,10 +304,15 @@ public class BookieReadWriteTest extends
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.asyncAddEntry(entry.array(), this, sync);
+                /*
+                 * Check that the difference is no larger than the throttling 
threshold
+                 */
+                int testValue = getAvailablePermits(lh);
+                assertTrue("Difference is incorrect : " + i + ", " + 
sync.counter + ", " + testValue, testValue <= throttle);
             }
             
 
-            for (int i = 0; i < 10000; i++) {
+            for (int i = 0; i < 2000; i++) {
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
@@ -268,6 +320,12 @@ public class BookieReadWriteTest extends
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.asyncAddEntry(entry.array(), this, sync);
+                
+                /*
+                 * Check that the difference is no larger than the throttling 
threshold
+                 */
+                int testValue = getAvailablePermits(lh);
+                assertTrue("Difference is incorrect : " + i + ", " + 
sync.counter + ", " + testValue, testValue <= throttle);
             }
             
             // wait for all entries to be acknowledged
@@ -290,35 +348,22 @@ public class BookieReadWriteTest extends
             assertTrue("Verifying number of entries written", 
lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
 
             // read entries
-            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
+            sync.counter = 0;
+            for (int i = 0; i < numEntriesToWrite; i+=throttle) {
+                lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync);
+                int testValue = getAvailablePermits(lh);
+                assertTrue("Difference is incorrect : " + i + ", " + 
sync.counter + ", " + testValue, testValue <= throttle);
+            }
+            
             synchronized (sync) {
-                while (sync.value == false) {
+                while (sync.counter < numEntriesToWrite) {
+                    LOG.info("Entries counter = " + sync.counter);
                     sync.wait();
                 }
             }
 
             LOG.debug("*** READ COMPLETE ***");
 
-            // at this point, LedgerSequence ls is filled with the returned
-            // values
-            int i = 0;
-            while (ls.hasMoreElements()) {
-                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
-                Integer origEntry = origbb.getInt();
-                byte[] entry = ls.nextElement().getEntry();
-                ByteBuffer result = ByteBuffer.wrap(entry);
-                LOG.debug("Length of result: " + result.capacity());
-                LOG.debug("Original entry: " + origEntry);
-
-                Integer retrEntry = result.getInt();
-                LOG.debug("Retrieved entry: " + retrEntry);
-                assertTrue("Checking entry " + i + " for equality", 
origEntry.equals(retrEntry));
-                assertTrue("Checking entry " + i + " for size", entry.length 
== entriesSize.get(i).intValue());
-                i++;
-            }
-            assertTrue("Checking number of read entries", i == 
numEntriesToWrite);
-
             lh.close();
         } catch (KeeperException e) {
             LOG.error("Test failed", e);
@@ -565,7 +610,10 @@ public class BookieReadWriteTest extends
     }
 
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) 
{
+        if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
+        
         SyncObj x = (SyncObj) ctx;
+        
         synchronized (x) {
             x.counter++;
             x.notify();
@@ -573,12 +621,14 @@ public class BookieReadWriteTest extends
     }
 
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> 
seq, Object ctx) {
+        if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
+        
         ls = seq;
+
         synchronized (sync) {
             sync.value = true;
             sync.notify();
         }
-
     }
 
     @Before


Reply via email to