Author: mahadev Date: Fri Apr 17 21:28:30 2009 New Revision: 766160 URL: http://svn.apache.org/viewvc?rev=766160&view=rev Log: ZOOKEEPER-373. One thread per bookie (flavio via mahadev)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Fri Apr 17 21:28:30 2009 @@ -99,6 +99,8 @@ ZOOKEEPER-361. integrate cppunit testing as part of hudson patch process. (giri via mahadev) + ZOOKEEPER-373. One thread per bookie (flavio via mahadev) + NEW FEATURES: Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Fri Apr 17 21:28:30 2009 @@ -22,6 +22,7 @@ import java.io.IOException; +import java.net.ConnectException; import java.io.ByteArrayOutputStream; import java.security.NoSuchAlgorithmException; import java.nio.ByteBuffer; @@ -35,6 +36,7 @@ import java.net.InetSocketAddress; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookieHandle; import org.apache.bookkeeper.client.LedgerSequence; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.LedgerHandle.QMode; @@ -77,8 +79,6 @@ ZooKeeper zk = null; QuorumEngine engine = null; - MessageDigest md = null; - //HashMap<Long, ArrayBlockingQueue<Operation> > qeMap; HashMap<Long, QuorumEngine> engines; HashSet<InetSocketAddress> bookieBlackList; @@ -386,6 +386,7 @@ List<String> list = zk.getChildren(prefix + getZKStringId(lId) + ensemble, false); + LOG.info("Length of list of bookies: " + list.size()); for(int i = 0 ; i < list.size() ; i++){ for(String s : list){ byte[] bindex = zk.getData(prefix + getZKStringId(lId) + ensemble + "/" + s, false, stat); @@ -421,11 +422,6 @@ return addr; } - public void initMessageDigest(String alg) - throws NoSuchAlgorithmException { - md = MessageDigest.getInstance(alg); - } - /** * Add entry synchronously to an open ledger. * @@ -644,6 +640,38 @@ return null; } + HashMap<InetSocketAddress, BookieHandle> bhMap = + new HashMap<InetSocketAddress, BookieHandle>(); + + /** + * Keeps a list of available BookieHandle objects and returns + * the corresponding object given an address. + * + * @param a InetSocketAddress + */ + + synchronized BookieHandle getBookieHandle(InetSocketAddress a) + throws ConnectException, IOException { + if(!bhMap.containsKey(a)){ + bhMap.put(a, new BookieHandle(a)); + } + bhMap.get(a).incRefCount(); + + return bhMap.get(a); + } + + /** + * When there are no more references to a BookieHandle, + * remove it from the list. + */ + + synchronized void haltBookieHandles(ArrayList<BookieHandle> bookies){ + for(BookieHandle bh : bookies){ + if(bh.halt() <= 0) + bhMap.remove(bh.addr); + } + } + /** * Blacklists bookies. * Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Fri Apr 17 21:28:30 2009 @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import java.security.NoSuchAlgorithmException; import java.security.InvalidKeyException; -import java.security.MessageDigest; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; @@ -55,22 +54,24 @@ Logger LOG = Logger.getLogger(BookieClient.class); boolean stop = false; - LedgerHandle self; - BookieClient client; + private BookieClient client; InetSocketAddress addr; static int recvTimeout = 2000; - ArrayBlockingQueue<ToSend> incomingQueue; + private ArrayBlockingQueue<ToSend> incomingQueue; + private int refCount = 0; /** * Objects of this class are queued waiting to be * processed. */ class ToSend { + LedgerHandle lh; long entry = -1; Object ctx; int type; - ToSend(SubOp sop, long entry){ + ToSend(LedgerHandle lh, SubOp sop, long entry){ + this.lh = lh; this.type = sop.op.type; this.entry = entry; this.ctx = sop; @@ -78,12 +79,10 @@ } /** - * @param lh ledger handle * @param addr address */ - BookieHandle(LedgerHandle lh, InetSocketAddress addr) throws IOException { + BookieHandle(InetSocketAddress addr) throws IOException { this.client = new BookieClient(addr, recvTimeout); - this.self = lh; this.addr = addr; this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000); @@ -110,40 +109,26 @@ * @param ctx * @throws IOException */ - public void sendAdd(SubAddOp r, long entry) + public void sendAdd(LedgerHandle lh, SubAddOp r, long entry) throws IOException { try{ - incomingQueue.put(new ToSend(r, entry)); + incomingQueue.put(new ToSend(lh, r, entry)); } catch(InterruptedException e){ e.printStackTrace(); } } /** - * Message disgest instance + * MAC instance * */ - MessageDigest digest = null; Mac mac = null; - /** - * Get digest instance if there is none. - * - */ - MessageDigest getDigestInstance(String alg) - throws NoSuchAlgorithmException { - if(digest == null){ - digest = MessageDigest.getInstance(alg); - } - - return digest; - } - - Mac getMac(String alg) + Mac getMac(byte[] macKey, String alg) throws NoSuchAlgorithmException, InvalidKeyException { if(mac == null){ mac = Mac.getInstance(alg); - mac.init(new SecretKeySpec(self.getMacKey(), "HmacSHA1")); + mac.init(new SecretKeySpec(macKey, "HmacSHA1")); } return mac; @@ -158,11 +143,11 @@ * @param ctx * @throws IOException */ - public void sendRead(SubReadOp r, long entry) + + public void sendRead(LedgerHandle lh, SubReadOp r, long entry) throws IOException { - //LOG.debug("readEntry: " + entry); try{ - incomingQueue.put(new ToSend(r, entry)); + incomingQueue.put(new ToSend(lh, r, entry)); } catch(InterruptedException e){ e.printStackTrace(); } @@ -173,6 +158,7 @@ try{ ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS); if(ts != null){ + LedgerHandle self = ts.lh; switch(ts.type){ case Operation.ADD: SubAddOp aOp = (SubAddOp) ts.ctx; @@ -198,7 +184,7 @@ //extendedData.limit(extendedData.capacity() - 20); extendedData.position(extendedData.capacity() - 20); if(mac == null) - getMac("HmacSHA1"); + getMac(self.getMacKey(), "HmacSHA1"); extendedData.put(mac.doFinal(toProcess)); extendedData.position(16); } else { @@ -235,8 +221,27 @@ } } - void halt(){ - stop = true; + /** + * Multiple ledgers may use the same BookieHandle object, so we keep + * a count on the number of references. + */ + int incRefCount(){ + return ++refCount; + } + + /** + * Halts if there is no ledger using this object. + */ + int halt(){ + int currentCount = --refCount; + if(currentCount <= 0){ + stop = true; + } + + if(currentCount < 0) + LOG.warn("Miscalculated the number of reference counts: " + addr); + + return currentCount; } } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Apr 17 21:28:30 2009 @@ -118,24 +118,24 @@ private void setBookies(ArrayList<InetSocketAddress> bookies) throws InterruptedException { - for(InetSocketAddress a : bookies){ - LOG.debug("Opening bookieHandle: " + a); - try{ - BookieHandle bh = new BookieHandle(this, a); - this.bookies.add(bh); - } catch(ConnectException e){ - LOG.error(e + "(bookie: " + a + ")"); + try{ + for(InetSocketAddress a : bookies){ + LOG.debug("Opening bookieHandle: " + a); + + //BookieHandle bh = new BookieHandle(this, a); + this.bookies.add(bk.getBookieHandle(a)); + } + } catch(ConnectException e){ + LOG.error(e); - InetSocketAddress addr = null; - addr = bk.getNewBookie(bookies); + InetSocketAddress addr = bk.getNewBookie(bookies); if(addr != null){ - bookies.add(addr); + bookies.add(addr); } - } catch(IOException e) { - LOG.error(e); - } - } + } catch(IOException e) { + LOG.error(e); + } } @@ -147,8 +147,9 @@ */ int addBookie(InetSocketAddress addr) throws IOException { - BookieHandle bh = new BookieHandle(this, addr); - this.bookies.add(bh); + LOG.info("My address: " + addr.toString()); + //BookieHandle bh = new BookieHandle(this, addr); + this.bookies.add(bk.getBookieHandle(addr)); if(bookies.size() > qSize) setThreshold(); @@ -190,7 +191,7 @@ throw BKException.create(Code.NoBookieAvailableException); } else { try{ - BookieHandle bh = new BookieHandle(this, addr); + //BookieHandle bh = new BookieHandle(this, addr); /* * TODO: Read from current bookies, and write to this one @@ -199,7 +200,7 @@ /* * If successful in writing to new bookie, add it to the set */ - this.bookies.set(index, bh); + this.bookies.set(index, bk.getBookieHandle(addr)); } catch(ConnectException e){ bk.blackListBookie(addr); LOG.error(e); @@ -224,9 +225,7 @@ void close(){ ledger = -1; last = -1; - for(BookieHandle bh : bookies){ - bh.halt(); - } + bk.haltBookieHandles(bookies); } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Fri Apr 17 21:28:30 2009 @@ -224,8 +224,7 @@ pROp, index, opMonitor); - lh.getBookies().get((index) % n).sendRead(sRead, entry); - + lh.getBookies().get((index) % n).sendRead(lh, sRead, entry); } catch(IOException e){ LOG.error(e); } @@ -248,7 +247,7 @@ pOp, index, opMonitor); - lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry); + lh.getBookies().get((index) % n).sendAdd(lh, sAdd, aOp.entry); } catch (IOException io) { LOG.error(io); try{ Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Fri Apr 17 21:28:30 2009 @@ -204,7 +204,7 @@ if(!ids.contains(Integer.valueOf(i))){ // and send it to new bookie try{ - list.get(i).sendAdd(new SubAddOp(sAdd.op, + list.get(i).sendAdd(lh, new SubAddOp(sAdd.op, pOp, i, this), ((AddOp) sAdd.op).entry); @@ -234,7 +234,6 @@ * Collect responses, and reply when there are sufficient * answers. */ - if(rc == 0){ SubReadOp sRead = (SubReadOp) ctx; ReadOp rOp = (ReadOp) sRead.op; @@ -274,7 +273,6 @@ break; case GENERIC: list = pOp.proposedValues; - LOG.debug("List length before: " + list.size()); synchronized(list){ if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null){ @@ -317,10 +315,10 @@ if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && !sRead.op.isReady()){ - sRead.op.setReady(); } + long diff = rOp.lastEntry - rOp.firstEntry; //LOG.debug("Counter: " + rOp.counter + ", " + diff); } @@ -394,8 +392,7 @@ bb.rewind(); byte[] msgDigest = mac.doFinal(data); - if(Arrays.equals(mac.doFinal(data), sig)){ - + if(Arrays.equals(msgDigest, sig)){ return bb; } else { LOG.error("Entry id: " + new String(msgDigest) + new String(sig)); Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=766160&r1=766159&r2=766160&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Fri Apr 17 21:28:30 2009 @@ -87,7 +87,7 @@ Integer initialPort = 5000; BookKeeper bkc; // bookkeeper client byte[] ledgerPassword = "aaa".getBytes(); - LedgerHandle lh; + LedgerHandle lh, lh2; long ledgerId; LedgerSequence ls; @@ -117,7 +117,7 @@ // Create a BookKeeper client and a ledger bkc = new BookKeeper("127.0.0.1"); lh = bkc.createLedger(ledgerPassword); - bkc.initMessageDigest("SHA1"); + //bkc.initMessageDigest("SHA1"); ledgerId = lh.getId(); LOG.info("Ledger ID: " + lh.getId()); for(int i = 0; i < numEntriesToWrite; i++){ @@ -138,11 +138,11 @@ } } - LOG.debug("*** WRITE COMPLETED ***"); + LOG.debug("*** WRITE COMPLETE ***"); // close ledger bkc.closeLedger(lh); - //*** WRITING PART COMPLETED // READ PART BEGINS *** + //*** WRITING PART COMPLETE // READ PART BEGINS *** // open ledger lh = bkc.openLedger(ledgerId, ledgerPassword); @@ -160,7 +160,7 @@ assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite); - LOG.debug("*** READ COMPLETED ***"); + LOG.debug("*** READ COMPLETE ***"); // at this point, LedgerSequence ls is filled with the returned values int i = 0; @@ -185,9 +185,9 @@ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } + } //catch (NoSuchAlgorithmException e) { + // e.printStackTrace(); + //} } @@ -200,7 +200,7 @@ // Create a BookKeeper client and a ledger bkc = new BookKeeper("127.0.0.1"); lh = bkc.createLedger(ledgerPassword); - bkc.initMessageDigest("SHA1"); + //bkc.initMessageDigest("SHA1"); ledgerId = lh.getId(); LOG.info("Ledger ID: " + lh.getId()); for(int i = 0; i < numEntriesToWrite; i++){ @@ -218,7 +218,7 @@ } } - LOG.debug("*** ASYNC WRITE COMPLETED ***"); + LOG.debug("*** ASYNC WRITE COMPLETE ***"); // close ledger bkc.closeLedger(lh); @@ -234,7 +234,7 @@ assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite); - LOG.debug("*** SYNC READ COMPLETED ***"); + LOG.debug("*** SYNC READ COMPLETE ***"); // at this point, LedgerSequence ls is filled with the returned values int i = 0; @@ -260,18 +260,19 @@ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } + } //catch (NoSuchAlgorithmException e) { + // e.printStackTrace(); + //} } + @Test public void testReadWriteSyncSingleClient() throws IOException { try { // Create a BookKeeper client and a ledger bkc = new BookKeeper("127.0.0.1"); lh = bkc.createLedger(ledgerPassword); - bkc.initMessageDigest("SHA1"); + //bkc.initMessageDigest("SHA1"); ledgerId = lh.getId(); LOG.info("Ledger ID: " + lh.getId()); for(int i = 0; i < numEntriesToWrite; i++){ @@ -306,17 +307,18 @@ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } + } //catch (NoSuchAlgorithmException e) { + // e.printStackTrace(); + //} } + @Test public void testReadWriteZero() throws IOException { try { // Create a BookKeeper client and a ledger bkc = new BookKeeper("127.0.0.1"); lh = bkc.createLedger(ledgerPassword); - bkc.initMessageDigest("SHA1"); + //bkc.initMessageDigest("SHA1"); ledgerId = lh.getId(); LOG.info("Ledger ID: " + lh.getId()); for(int i = 0; i < numEntriesToWrite; i++){ @@ -352,11 +354,71 @@ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } + } //catch (NoSuchAlgorithmException e) { + // e.printStackTrace(); + //} } + @Test + public void testMultiLedger() throws IOException { + try { + // Create a BookKeeper client and a ledger + bkc = new BookKeeper("127.0.0.1"); + lh = bkc.createLedger(ledgerPassword); + lh2 = bkc.createLedger(ledgerPassword); + + long ledgerId = lh.getId(); + long ledgerId2 = lh2.getId(); + + //bkc.initMessageDigest("SHA1"); + LOG.info("Ledger ID 1: " + lh.getId() + ", Ledger ID 2: " + lh2.getId()); + for(int i = 0; i < numEntriesToWrite; i++){ + bkc.addEntry(lh, new byte[0]); + bkc.addEntry(lh2, new byte[0]); + } + + bkc.closeLedger(lh); + bkc.closeLedger(lh2); + + lh = bkc.openLedger(ledgerId, ledgerPassword); + lh2 = bkc.openLedger(ledgerId2, ledgerPassword); + + LOG.debug("Number of entries written: " + lh.getLast() + ", " + lh2.getLast()); + assertTrue("Verifying number of entries written lh (" + lh.getLast() + ")" , lh.getLast() == numEntriesToWrite); + assertTrue("Verifying number of entries written lh2 (" + lh2.getLast() + ")", lh2.getLast() == numEntriesToWrite); + + ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1); + int i = 0; + while(ls.hasMoreElements()){ + ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry()); + LOG.debug("Length of result: " + result.capacity()); + + assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0); + } + bkc.closeLedger(lh); + + ls = bkc.readEntries(lh2, 0, numEntriesToWrite - 1); + i = 0; + while(ls.hasMoreElements()){ + ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry()); + LOG.debug("Length of result: " + result.capacity()); + + assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0); + } + + bkc.closeLedger(lh2); + + } catch (KeeperException e) { + e.printStackTrace(); + } catch (BKException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } //catch (NoSuchAlgorithmException e) { + // e.printStackTrace(); + //} + } + public void addComplete(int rc, long ledgerId, long entryId, Object ctx) { SyncObj x = (SyncObj) ctx; @@ -451,7 +513,7 @@ } protected void tearDown(){ - LOG.debug("TearDown"); + LOG.info("TearDown"); //shutdown bookie servers try { @@ -478,7 +540,7 @@ /* Clean up a directory recursively */ protected boolean cleanUpDir(File dir){ if (dir.isDirectory()) { - System.err.println("Cleaning up " + dir.getName()); + LOG.info("Cleaning up " + dir.getName()); String[] children = dir.list(); for (String string : children) { boolean success = cleanUpDir(new File(dir, string));