Author: mahadev
Date: Fri Apr 17 21:28:30 2009
New Revision: 766160

URL: http://svn.apache.org/viewvc?rev=766160&view=rev
Log:
ZOOKEEPER-373. One thread per bookie (flavio via mahadev) 

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
    
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Apr 17 21:28:30 2009
@@ -99,6 +99,8 @@
   ZOOKEEPER-361. integrate cppunit testing as part of hudson patch process.
   (giri via mahadev)
 
+  ZOOKEEPER-373. One thread per bookie (flavio via mahadev) 
+
 NEW FEATURES:
 
 

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
 Fri Apr 17 21:28:30 2009
@@ -22,6 +22,7 @@
 
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.io.ByteArrayOutputStream;
 import java.security.NoSuchAlgorithmException;
 import java.nio.ByteBuffer;
@@ -35,6 +36,7 @@
 import java.net.InetSocketAddress;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
@@ -77,8 +79,6 @@
     
     ZooKeeper zk = null;
     QuorumEngine engine = null;
-    MessageDigest md = null;
-    //HashMap<Long, ArrayBlockingQueue<Operation> > qeMap;
     HashMap<Long, QuorumEngine> engines;
     HashSet<InetSocketAddress> bookieBlackList;
     
@@ -386,6 +386,7 @@
         List<String> list = 
             zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
         
+        LOG.info("Length of list of bookies: " + list.size());
         for(int i = 0 ; i < list.size() ; i++){
             for(String s : list){
                 byte[] bindex = zk.getData(prefix + getZKStringId(lId) + 
ensemble + "/" + s, false, stat);
@@ -421,11 +422,6 @@
         return addr;
     }
     
-    public void initMessageDigest(String alg)
-    throws NoSuchAlgorithmException {
-        md = MessageDigest.getInstance(alg);
-    }
-    
     /**
      * Add entry synchronously to an open ledger.
      * 
@@ -644,6 +640,38 @@
         return null;
     }
     
+    HashMap<InetSocketAddress, BookieHandle> bhMap = 
+       new HashMap<InetSocketAddress, BookieHandle>();
+    
+    /**
+     *  Keeps a list of available BookieHandle objects and returns
+     *  the corresponding object given an address.
+     *  
+     *  @param a       InetSocketAddress
+     */
+    
+    synchronized BookieHandle getBookieHandle(InetSocketAddress a)
+    throws ConnectException, IOException {
+       if(!bhMap.containsKey(a)){
+               bhMap.put(a, new BookieHandle(a));
+       }
+       bhMap.get(a).incRefCount();
+       
+       return bhMap.get(a);
+    }
+    
+    /**
+     * When there are no more references to a BookieHandle,
+     * remove it from the list. 
+     */
+    
+    synchronized void haltBookieHandles(ArrayList<BookieHandle> bookies){
+        for(BookieHandle bh : bookies){
+            if(bh.halt() <= 0)
+                bhMap.remove(bh.addr);
+        }
+    }
+    
     /**
      * Blacklists bookies.
      * 

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
 Fri Apr 17 21:28:30 2009
@@ -28,7 +28,6 @@
 import java.util.concurrent.TimeUnit;
 import java.security.NoSuchAlgorithmException;
 import java.security.InvalidKeyException;
-import java.security.MessageDigest;
 import javax.crypto.Mac; 
 import javax.crypto.spec.SecretKeySpec;
 
@@ -55,22 +54,24 @@
     Logger LOG = Logger.getLogger(BookieClient.class);
     
     boolean stop = false;
-    LedgerHandle self;
-    BookieClient client;
+    private BookieClient client;
     InetSocketAddress addr;
     static int recvTimeout = 2000;
-    ArrayBlockingQueue<ToSend> incomingQueue;
+    private ArrayBlockingQueue<ToSend> incomingQueue;
+    private int refCount = 0;
     
     /**
      * Objects of this class are queued waiting to be
      * processed.
      */
     class ToSend {
+       LedgerHandle lh;
         long entry = -1;
         Object ctx;
         int type;
         
-        ToSend(SubOp sop, long entry){
+        ToSend(LedgerHandle lh, SubOp sop, long entry){
+               this.lh = lh;
             this.type = sop.op.type;
             this.entry = entry;
             this.ctx = sop;
@@ -78,12 +79,10 @@
     }
     
     /**
-     * @param lh       ledger handle
      * @param addr     address
      */
-    BookieHandle(LedgerHandle lh, InetSocketAddress addr) throws IOException {
+    BookieHandle(InetSocketAddress addr) throws IOException {
         this.client = new BookieClient(addr, recvTimeout);
-        this.self = lh;
         this.addr = addr;
         this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
         
@@ -110,40 +109,26 @@
      * @param ctx
      * @throws IOException
      */
-    public void sendAdd(SubAddOp r, long entry)
+    public void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
     throws IOException {
         try{
-            incomingQueue.put(new ToSend(r, entry));
+            incomingQueue.put(new ToSend(lh, r, entry));
         } catch(InterruptedException e){
             e.printStackTrace();
         }
     }
     
     /**
-     * Message disgest instance
+     * MAC instance
      * 
      */
-    MessageDigest digest = null;
     Mac mac = null;
     
-    /** 
-     * Get digest instance if there is none.
-     * 
-     */
-    MessageDigest getDigestInstance(String alg)
-    throws NoSuchAlgorithmException {
-        if(digest == null){
-            digest = MessageDigest.getInstance(alg);
-        }
-        
-        return digest;
-    }
-    
-    Mac getMac(String alg)
+    Mac getMac(byte[] macKey, String alg)
     throws NoSuchAlgorithmException, InvalidKeyException {
         if(mac == null){
             mac = Mac.getInstance(alg);
-            mac.init(new SecretKeySpec(self.getMacKey(), "HmacSHA1"));
+            mac.init(new SecretKeySpec(macKey, "HmacSHA1"));
         }
         
         return mac;
@@ -158,11 +143,11 @@
      * @param ctx
      * @throws IOException
      */
-    public void sendRead(SubReadOp r, long entry)
+    
+    public void sendRead(LedgerHandle lh, SubReadOp r, long entry)
     throws IOException {
-        //LOG.debug("readEntry: " + entry);
         try{
-            incomingQueue.put(new ToSend(r, entry));
+            incomingQueue.put(new ToSend(lh, r, entry));
         } catch(InterruptedException e){
             e.printStackTrace();
         }
@@ -173,6 +158,7 @@
             try{
                 ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
                 if(ts != null){
+                       LedgerHandle self = ts.lh;
                     switch(ts.type){
                     case Operation.ADD:
                         SubAddOp aOp = (SubAddOp) ts.ctx;
@@ -198,7 +184,7 @@
                             //extendedData.limit(extendedData.capacity() - 20);
                             extendedData.position(extendedData.capacity() - 
20);
                             if(mac == null)
-                                getMac("HmacSHA1");
+                                getMac(self.getMacKey(), "HmacSHA1");
                             extendedData.put(mac.doFinal(toProcess));
                             extendedData.position(16);
                         } else {
@@ -235,8 +221,27 @@
         }
     }
     
-    void halt(){
-        stop = true;
+    /**
+     * Multiple ledgers may use the same BookieHandle object, so we keep
+     * a count on the number of references.
+     */
+    int incRefCount(){
+        return ++refCount;
+    }
+    
+    /**
+     * Halts if there is no ledger using this object.
+     */
+    int halt(){
+        int currentCount = --refCount;
+        if(currentCount <= 0){
+            stop = true;
+        }
+        
+        if(currentCount < 0)
+            LOG.warn("Miscalculated the number of reference counts: " + addr);
+        
+        return currentCount;
     }
 }
 

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
 Fri Apr 17 21:28:30 2009
@@ -118,24 +118,24 @@
     
     private void setBookies(ArrayList<InetSocketAddress> bookies)
     throws InterruptedException {
-        for(InetSocketAddress a : bookies){
-            LOG.debug("Opening bookieHandle: " + a);
-            try{
-                BookieHandle bh = new BookieHandle(this, a);
-                this.bookies.add(bh);
-            } catch(ConnectException e){
-                LOG.error(e + "(bookie: " + a + ")");
+       try{
+               for(InetSocketAddress a : bookies){
+                       LOG.debug("Opening bookieHandle: " + a);
+            
+                       //BookieHandle bh = new BookieHandle(this, a);
+                       this.bookies.add(bk.getBookieHandle(a));
+               }
+       } catch(ConnectException e){
+               LOG.error(e);
                 
-                InetSocketAddress addr = null;
-                addr = bk.getNewBookie(bookies);
+               InetSocketAddress addr = bk.getNewBookie(bookies);
                 
                 if(addr != null){
-                    bookies.add(addr);
+                       bookies.add(addr);
                 }
-            } catch(IOException e) {
-                LOG.error(e);
-            }
-        }
+       } catch(IOException e) {
+               LOG.error(e);
+       }
     }
     
     
@@ -147,8 +147,9 @@
      */
     int addBookie(InetSocketAddress addr)
     throws IOException {
-        BookieHandle bh = new BookieHandle(this, addr);
-        this.bookies.add(bh);
+        LOG.info("My address: " + addr.toString());
+        //BookieHandle bh = new BookieHandle(this, addr);
+        this.bookies.add(bk.getBookieHandle(addr));
         
         if(bookies.size() > qSize) setThreshold();
         
@@ -190,7 +191,7 @@
             throw BKException.create(Code.NoBookieAvailableException);
         } else {           
             try{
-                BookieHandle bh = new BookieHandle(this, addr);
+                //BookieHandle bh = new BookieHandle(this, addr);
                 
                 /*
                  * TODO: Read from current bookies, and write to this one
@@ -199,7 +200,7 @@
                 /*
                  * If successful in writing to new bookie, add it to the set
                  */
-                this.bookies.set(index, bh);
+                this.bookies.set(index, bk.getBookieHandle(addr));
             } catch(ConnectException e){
                 bk.blackListBookie(addr);
                 LOG.error(e);
@@ -224,9 +225,7 @@
     void close(){
         ledger = -1;
         last = -1;
-        for(BookieHandle bh : bookies){
-            bh.halt();
-        }
+        bk.haltBookieHandles(bookies);
     }
     
     

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
 Fri Apr 17 21:28:30 2009
@@ -224,8 +224,7 @@
                                 pROp, 
                                 index,
                                 opMonitor);
-                        lh.getBookies().get((index) % n).sendRead(sRead, 
entry);
-                                    
+                        lh.getBookies().get((index) % n).sendRead(lh, sRead, 
entry);            
                     } catch(IOException e){
                         LOG.error(e);
                     }
@@ -248,7 +247,7 @@
                             pOp, 
                             index,
                             opMonitor);
-                    lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
+                    lh.getBookies().get((index) % n).sendAdd(lh, sAdd, 
aOp.entry);
                 } catch (IOException io) {
                     LOG.error(io);
                     try{

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
 Fri Apr 17 21:28:30 2009
@@ -204,7 +204,7 @@
                     if(!ids.contains(Integer.valueOf(i))){
                         // and send it to new bookie
                         try{
-                            list.get(i).sendAdd(new SubAddOp(sAdd.op, 
+                            list.get(i).sendAdd(lh, new SubAddOp(sAdd.op, 
                                     pOp, 
                                     i, 
                                     this), ((AddOp) sAdd.op).entry);
@@ -234,7 +234,6 @@
          * Collect responses, and reply when there are sufficient 
          * answers.
          */
-        
         if(rc == 0){
             SubReadOp sRead = (SubReadOp) ctx;
             ReadOp rOp = (ReadOp) sRead.op;
@@ -274,7 +273,6 @@
                         break;
                     case GENERIC:
                         list = pOp.proposedValues;
-                        LOG.debug("List length before: " + list.size());
                         
                         synchronized(list){
                             if(rOp.seq[(int) (entryId % (rOp.lastEntry - 
rOp.firstEntry + 1))] == null){
@@ -317,10 +315,10 @@
         
                     if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
                             !sRead.op.isReady()){
-                        
                         sRead.op.setReady();
                     }
             
+                    
                     long diff = rOp.lastEntry - rOp.firstEntry;
                     //LOG.debug("Counter: " + rOp.counter + ", " + diff);
                 }
@@ -394,8 +392,7 @@
         bb.rewind();
         
         byte[] msgDigest = mac.doFinal(data);
-        if(Arrays.equals(mac.doFinal(data), sig)){
-            
+        if(Arrays.equals(msgDigest, sig)){
             return bb;
         } else {
             LOG.error("Entry id: " + new String(msgDigest) + new String(sig));

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=766160&r1=766159&r2=766160&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
 Fri Apr 17 21:28:30 2009
@@ -87,7 +87,7 @@
        Integer initialPort = 5000;
        BookKeeper bkc; // bookkeeper client
        byte[] ledgerPassword = "aaa".getBytes();
-       LedgerHandle lh;
+       LedgerHandle lh, lh2;
        long ledgerId;
        LedgerSequence ls;
        
@@ -117,7 +117,7 @@
                        // Create a BookKeeper client and a ledger
                        bkc = new BookKeeper("127.0.0.1");
                        lh = bkc.createLedger(ledgerPassword);
-                       bkc.initMessageDigest("SHA1");
+                       //bkc.initMessageDigest("SHA1");
                        ledgerId = lh.getId();
                        LOG.info("Ledger ID: " + lh.getId());
                        for(int i = 0; i < numEntriesToWrite; i++){
@@ -138,11 +138,11 @@
                                }
                        }
                        
-                       LOG.debug("*** WRITE COMPLETED ***");
+                       LOG.debug("*** WRITE COMPLETE ***");
                        // close ledger 
                        bkc.closeLedger(lh);
                        
-                       //*** WRITING PART COMPLETED // READ PART BEGINS ***
+                       //*** WRITING PART COMPLETE // READ PART BEGINS ***
                        
                        // open ledger
                        lh = bkc.openLedger(ledgerId, ledgerPassword);
@@ -160,7 +160,7 @@
                        
                        assertTrue("Checking number of read entries", ls.size() 
== numEntriesToWrite);
                        
-                       LOG.debug("*** READ COMPLETED ***");
+                       LOG.debug("*** READ COMPLETE ***");
                        
                        // at this point, LedgerSequence ls is filled with the 
returned values
                        int i = 0;
@@ -185,9 +185,9 @@
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        e.printStackTrace();
-               } catch (NoSuchAlgorithmException e) {
-                       e.printStackTrace();
-               }
+               } //catch (NoSuchAlgorithmException e) {
+               //      e.printStackTrace();
+               //}
                
        }
        
@@ -200,7 +200,7 @@
                        // Create a BookKeeper client and a ledger
                        bkc = new BookKeeper("127.0.0.1");
                        lh = bkc.createLedger(ledgerPassword);
-                       bkc.initMessageDigest("SHA1");
+                       //bkc.initMessageDigest("SHA1");
                        ledgerId = lh.getId();
                        LOG.info("Ledger ID: " + lh.getId());
                        for(int i = 0; i < numEntriesToWrite; i++){
@@ -218,7 +218,7 @@
                                }
                        }
                        
-                       LOG.debug("*** ASYNC WRITE COMPLETED ***");
+                       LOG.debug("*** ASYNC WRITE COMPLETE ***");
                        // close ledger 
                        bkc.closeLedger(lh);
                        
@@ -234,7 +234,7 @@
                        
                        assertTrue("Checking number of read entries", ls.size() 
== numEntriesToWrite);
                        
-                       LOG.debug("*** SYNC READ COMPLETED ***");
+                       LOG.debug("*** SYNC READ COMPLETE ***");
                        
                        // at this point, LedgerSequence ls is filled with the 
returned values
                        int i = 0;
@@ -260,18 +260,19 @@
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        e.printStackTrace();
-               } catch (NoSuchAlgorithmException e) {
-                       e.printStackTrace();
-               }
+               } //catch (NoSuchAlgorithmException e) {
+               //      e.printStackTrace();
+               //}
                
        }
     
+    @Test
     public void testReadWriteSyncSingleClient() throws IOException {
                try {
                        // Create a BookKeeper client and a ledger
                        bkc = new BookKeeper("127.0.0.1");
                        lh = bkc.createLedger(ledgerPassword);
-                       bkc.initMessageDigest("SHA1");
+                       //bkc.initMessageDigest("SHA1");
                        ledgerId = lh.getId();
                        LOG.info("Ledger ID: " + lh.getId());
                        for(int i = 0; i < numEntriesToWrite; i++){
@@ -306,17 +307,18 @@
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        e.printStackTrace();
-               } catch (NoSuchAlgorithmException e) {
-                       e.printStackTrace();
-               }
+               } //catch (NoSuchAlgorithmException e) {
+               //      e.printStackTrace();
+               //}
        }
        
+    @Test
     public void testReadWriteZero() throws IOException {
                try {
                        // Create a BookKeeper client and a ledger
                        bkc = new BookKeeper("127.0.0.1");
                        lh = bkc.createLedger(ledgerPassword);
-                       bkc.initMessageDigest("SHA1");
+                       //bkc.initMessageDigest("SHA1");
                        ledgerId = lh.getId();
                        LOG.info("Ledger ID: " + lh.getId());
                        for(int i = 0; i < numEntriesToWrite; i++){             
                
@@ -352,11 +354,71 @@
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        e.printStackTrace();
-               } catch (NoSuchAlgorithmException e) {
-                       e.printStackTrace();
-               }
+               } //catch (NoSuchAlgorithmException e) {
+               //      e.printStackTrace();
+               //}
        }
     
+    @Test
+    public void testMultiLedger() throws IOException {
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(ledgerPassword);
+            lh2 = bkc.createLedger(ledgerPassword);
+            
+            long ledgerId = lh.getId();
+            long ledgerId2 = lh2.getId();
+            
+            //bkc.initMessageDigest("SHA1");
+            LOG.info("Ledger ID 1: " + lh.getId() + ", Ledger ID 2: " + 
lh2.getId());
+            for(int i = 0; i < numEntriesToWrite; i++){             
+                bkc.addEntry(lh, new byte[0]);
+                bkc.addEntry(lh2, new byte[0]);
+            }
+            
+            bkc.closeLedger(lh);
+            bkc.closeLedger(lh2);
+            
+            lh = bkc.openLedger(ledgerId, ledgerPassword);
+            lh2 = bkc.openLedger(ledgerId2, ledgerPassword);
+            
+            LOG.debug("Number of entries written: " + lh.getLast() + ", " + 
lh2.getLast());
+            assertTrue("Verifying number of entries written lh (" + 
lh.getLast() + ")" , lh.getLast() == numEntriesToWrite);
+            assertTrue("Verifying number of entries written lh2 (" + 
lh2.getLast() + ")", lh2.getLast() == numEntriesToWrite);
+            
+            ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+            int i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer result = 
ByteBuffer.wrap(ls.nextElement().getEntry());
+                LOG.debug("Length of result: " + result.capacity());
+                
+                assertTrue("Checking if entry " + i + " has zero bytes", 
result.capacity() == 0);
+            }
+            bkc.closeLedger(lh);
+            
+            ls = bkc.readEntries(lh2, 0, numEntriesToWrite - 1);
+            i = 0;
+            while(ls.hasMoreElements()){
+                ByteBuffer result = 
ByteBuffer.wrap(ls.nextElement().getEntry());
+                LOG.debug("Length of result: " + result.capacity());
+                
+                assertTrue("Checking if entry " + i + " has zero bytes", 
result.capacity() == 0);
+            }
+            
+            bkc.closeLedger(lh2);
+            
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (BKException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } //catch (NoSuchAlgorithmException e) {
+           // e.printStackTrace();
+        //}
+    }
+    
     
        public void addComplete(int rc, long ledgerId, long entryId, Object 
ctx) {
                SyncObj x = (SyncObj) ctx;
@@ -451,7 +513,7 @@
        }
        
        protected void tearDown(){
-               LOG.debug("TearDown");
+               LOG.info("TearDown");
 
                //shutdown bookie servers 
                try {
@@ -478,7 +540,7 @@
        /*      Clean up a directory recursively */
        protected boolean cleanUpDir(File dir){
                if (dir.isDirectory()) {
-                       System.err.println("Cleaning up " + dir.getName());
+                       LOG.info("Cleaning up " + dir.getName());
             String[] children = dir.list();
             for (String string : children) {
                                boolean success = cleanUpDir(new File(dir, 
string));


Reply via email to