Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java?rev=903483&r1=903482&r2=903483&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java Tue Jan 26 23:16:45 2010 @@ -1,4 +1,5 @@ package org.apache.bookkeeper.test; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,275 +21,54 @@ * */ - -import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; - -import java.lang.InterruptedException; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; - -import junit.framework.TestCase; - import org.junit.*; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.NIOServerCnxn; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ServerStats; -import org.apache.zookeeper.test.ClientBase; - /** - * This unit test tests closing ledgers sequentially. - * It creates 4 ledgers, then write 1000 entries to each - * ledger and close it. + * This unit test tests closing ledgers sequentially. It creates 4 ledgers, then + * write 1000 entries to each ledger and close it. * */ -public class CloseTest -extends TestCase -implements Watcher { +public class CloseTest extends BaseTestCase{ static Logger LOG = Logger.getLogger(LedgerRecoveryTest.class); - - BookieServer bs1, bs2, bs3; - File tmpDir1, tmpDir2, tmpDir3, tmpDirZK; - private static final String HOSTPORT = "127.0.0.1:33299"; - private NIOServerCnxn.Factory serverFactory; - - private static String BOOKIEADDR1 = "127.0.0.1:33300"; - private static String BOOKIEADDR2 = "127.0.0.1:33301"; - private static String BOOKIEADDR3 = "127.0.0.1:33302"; - - private static void recursiveDelete(File dir) { - File children[] = dir.listFiles(); - if (children != null) { - for(File child: children) { - recursiveDelete(child); - } - } - dir.delete(); - } - - protected void setUp() throws Exception { - /* - * Creates 3 BookieServers - */ - - - tmpDir1 = File.createTempFile("bookie1", "test"); - tmpDir1.delete(); - tmpDir1.mkdir(); - - final int PORT1 = Integer.parseInt(BOOKIEADDR1.split(":")[1]); - bs1 = new BookieServer(PORT1, tmpDir1, new File[] { tmpDir1 }); - bs1.start(); - - tmpDir2 = File.createTempFile("bookie2", "test"); - tmpDir2.delete(); - tmpDir2.mkdir(); - - final int PORT2 = Integer.parseInt(BOOKIEADDR2.split(":")[1]); - bs2 = new BookieServer(PORT2, tmpDir2, new File[] { tmpDir2 }); - bs2.start(); - - tmpDir3 = File.createTempFile("bookie3", "test"); - tmpDir3.delete(); - tmpDir3.mkdir(); - - final int PORT3 = Integer.parseInt(BOOKIEADDR3.split(":")[1]); - bs3 = new BookieServer(PORT3, tmpDir3, new File[] { tmpDir3 }); - bs3.start(); - - /* - * Instantiates a ZooKeeper server. This is a blind copy - * of setUp from SessionTest.java. - */ - LOG.info("STARTING " + getName()); - - //ServerStats.registerAsConcrete(); - - tmpDirZK = ClientBase.createTmpDir(); + DigestType digestType; - ClientBase.setupTestEnv(); - ZooKeeperServer zs = new ZooKeeperServer(tmpDirZK, tmpDirZK, 3000); - - final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); - serverFactory = new NIOServerCnxn.Factory(PORT); - serverFactory.startup(zs); - - assertTrue("waiting for server up", - ClientBase.waitForServerUp(HOSTPORT, - CONNECTION_TIMEOUT)); - - /* - * Creating necessary znodes - */ - try{ - ZooKeeper zk = new ZooKeeper(HOSTPORT, 3000, this); - zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.close(); - } catch (KeeperException ke) { - LOG.error(ke); - fail("Couldn't execute ZooKeeper start procedure"); - } - - } - - /** - * Watcher method. - */ - synchronized public void process(WatchedEvent event) { - LOG.info("Process: " + event.getType() + " " + event.getPath()); - } - - protected void tearDown() throws Exception { - LOG.info("### Tear down ###"); - bs1.shutdown(); - recursiveDelete(tmpDir1); - - bs2.shutdown(); - recursiveDelete(tmpDir2); - - bs3.shutdown(); - recursiveDelete(tmpDir3); - - serverFactory.shutdown(); - assertTrue("waiting for server down", - ClientBase.waitForServerDown(HOSTPORT, - CONNECTION_TIMEOUT)); - - //ServerStats.unregister(); - recursiveDelete(tmpDirZK); - LOG.info("FINISHED " + getName()); + public CloseTest(DigestType digestType) { + super(3); + this.digestType = digestType; } @Test - public void testClose(){ - /* - * Instantiate BookKeeper object. - */ - BookKeeper bk = null; - try{ - bk = new BookKeeper(HOSTPORT); - } catch (KeeperException ke){ - LOG.error("Error instantiating BookKeeper", ke); - fail("ZooKeeper error"); - } catch (IOException ioe){ - LOG.error(ioe); - fail("Failure due to IOException"); - } - + public void testClose() throws Exception { + /* * Create 4 ledgers. */ - LedgerHandle lh1 = null; - LedgerHandle lh2 = null; - LedgerHandle lh3 = null; - LedgerHandle lh4 = null; - - try{ - lh1 = bk.createLedger("".getBytes()); - lh2 = bk.createLedger("".getBytes()); - lh3 = bk.createLedger("".getBytes()); - lh4 = bk.createLedger("".getBytes()); - } catch (KeeperException ke){ - LOG.error("Error creating a ledger", ke); - fail("ZooKeeper error"); - } catch (BKException bke){ - LOG.error("BookKeeper error"); - fail("BookKeeper error"); - } catch (InterruptedException ie) { - LOG.error(ie); - fail("Failure due to interrupted exception"); - } catch (IOException ioe) { - LOG.error(ioe); - fail("Failure due to IO exception"); - } - - /* - * Write a 1000 entries to lh1. - */ - try{ - String tmp = "BookKeeper is cool!"; - for(int i = 0; i < 1000; i++){ - lh1.addEntry(tmp.getBytes()); - } - } catch(InterruptedException e){ - LOG.error("Interrupted when adding entry", e); - fail("Couldn't finish adding entries"); - } catch(BKException e){ - LOG.error("BookKeeper exception", e); - fail("BookKeeper exception when adding entries"); - } - - try{ - lh1.close(); - } catch(Exception e) { - LOG.error(e); - fail("Exception while closing ledger 1"); - } - /* - * Write a 1000 entries to lh2. - */ - try{ - String tmp = "BookKeeper is cool!"; - for(int i = 0; i < 1000; i++){ - lh2.addEntry(tmp.getBytes()); - } - } catch(InterruptedException e){ - LOG.error("Interrupted when adding entry", e); - fail("Couldn't finish adding entries"); - } catch(BKException e){ - LOG.error("BookKeeper exception", e); - fail("CBookKeeper exception while adding entries"); - } - - try{ - lh2.close(); - } catch(Exception e){ - LOG.error(e); - fail("Exception while closing ledger 2"); + int numLedgers = 4; + int numMsgs = 100; + + LedgerHandle[] lh = new LedgerHandle[numLedgers]; + for (int i = 0; i < numLedgers; i++) { + lh[i] = bkc.createLedger(digestType, "".getBytes()); } - + + String tmp = "BookKeeper is cool!"; + /* - * Write a 1000 entries to lh3 and lh4. + * Write 1000 entries to lh1. */ - try{ - String tmp = "BookKeeper is cool!"; - for(int i = 0; i < 1000; i++){ - lh3.addEntry(tmp.getBytes()); - lh4.addEntry(tmp.getBytes()); + for (int i = 0; i < numMsgs; i++) { + for (int j = 0; j < numLedgers; j++) { + lh[j].addEntry(tmp.getBytes()); } - } catch(InterruptedException e){ - LOG.error("Interrupted when adding entry", e); - fail("Couldn't finish adding entries"); - } catch(BKException e){ - LOG.error("BookKeeper exception", e); - fail("BookKeeper exception when adding entries"); } - - try{ - lh3.close(); - lh4.close(); - } catch(Exception e){ - LOG.error(e); - fail("Exception while closing ledger 4"); + + for (int i = 0; i < numLedgers; i++) { + + lh[i].close(); } - } + } } - - \ No newline at end of file
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java?rev=903483&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java (added) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java Tue Jan 26 23:16:45 2010 @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.TestCase; + +/** + * Tests writing to concurrent ledgers + */ +public class ConcurrentLedgerTest extends TestCase { + Bookie bookie; + File txnDir, ledgerDir; + int recvTimeout = 10000; + Semaphore throttle; + + @Override + @Before + public void setUp() throws IOException { + String txnDirName = System.getProperty("txnDir"); + if (txnDirName != null) { + txnDir = new File(txnDirName); + } + String ledgerDirName = System.getProperty("ledgerDir"); + if (ledgerDirName != null) { + ledgerDir = new File(ledgerDirName); + } + File tmpFile = File.createTempFile("book", ".txn", txnDir); + tmpFile.delete(); + txnDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir"); + txnDir.mkdirs(); + tmpFile = File.createTempFile("book", ".ledger", ledgerDir); + ledgerDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir"); + ledgerDir.mkdirs(); + + bookie = new Bookie(txnDir, new File[] {ledgerDir}); + } + + static void recursiveDelete(File f) { + if (f.isFile()) { + f.delete(); + } else { + for(File i: f.listFiles()) { + recursiveDelete(i); + } + f.delete(); + } + } + + @Override + @After + public void tearDown() { + try { + bookie.shutdown(); + recursiveDelete(txnDir); + recursiveDelete(ledgerDir); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + byte zeros[] = new byte[16]; + + int iterations = 51; + { + String iterationsString = System.getProperty("iterations"); + if (iterationsString != null) { + iterations = Integer.parseInt(iterationsString); + } + } + int iterationStep = 25; + { + String iterationsString = System.getProperty("iterationStep"); + if (iterationsString != null) { + iterationStep = Integer.parseInt(iterationsString); + } + } + @Test + public void testConcurrentWrite() throws IOException, InterruptedException, BookieException { + int size = 1024; + int totalwrites = 128; + if (System.getProperty("totalwrites") != null) { + totalwrites = Integer.parseInt(System.getProperty("totalwrites")); + } + System.out.println("Running up to " + iterations + " iterations"); + System.out.println("Total writes = " + totalwrites); + int ledgers; + for(ledgers = 1; ledgers <= iterations; ledgers += iterationStep) { + long duration = doWrites(ledgers, size, totalwrites); + System.out.println(totalwrites + " on " + ledgers + " took " + duration + " ms"); + } + System.out.println("ledgers " + ledgers); + for(ledgers = 1; ledgers <= iterations; ledgers += iterationStep) { + long duration = doReads(ledgers, size, totalwrites); + System.out.println(ledgers + " read " + duration + " ms"); + } + } + + private long doReads(int ledgers, int size, int totalwrites) + throws IOException, InterruptedException, BookieException { + long start = System.currentTimeMillis(); + for(int i = 1; i <= totalwrites/ledgers; i++) { + for(int j = 1; j <= ledgers; j++) { + ByteBuffer entry = bookie.readEntry(j, i); + // skip the ledger id and the entry id + entry.getLong(); + entry.getLong(); + assertEquals(j + "@" + i, j+2, entry.getLong()); + assertEquals(j + "@" + i, i+3, entry.getLong()); + } + } + long finish = System.currentTimeMillis(); + return finish - start; + } + private long doWrites(int ledgers, int size, int totalwrites) + throws IOException, InterruptedException, BookieException { + throttle = new Semaphore(10000); + WriteCallback cb = new WriteCallback() { + @Override + public void writeComplete(int rc, long ledgerId, long entryId, + InetSocketAddress addr, Object ctx) { + AtomicInteger counter = (AtomicInteger)ctx; + counter.getAndIncrement(); + throttle.release(); + } + }; + AtomicInteger counter = new AtomicInteger(); + long start = System.currentTimeMillis(); + for(int i = 1; i <= totalwrites/ledgers; i++) { + for(int j = 1; j <= ledgers; j++) { + ByteBuffer bytes = ByteBuffer.allocate(size); + bytes.putLong(j); + bytes.putLong(i); + bytes.putLong(j+2); + bytes.putLong(i+3); + bytes.put(("This is ledger " + j + " entry " + i).getBytes()); + bytes.position(0); + bytes.limit(bytes.capacity()); + throttle.acquire(); + bookie.addEntry(bytes, cb, counter, zeros); + } + } + long finish = System.currentTimeMillis(); + return finish - start; + } +} Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java?rev=903483&r1=903482&r2=903483&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java Tue Jan 26 23:16:45 2010 @@ -1,4 +1,5 @@ package org.apache.bookkeeper.test; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,318 +21,65 @@ * */ - -import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; - -import java.lang.InterruptedException; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; - -import junit.framework.TestCase; - import org.junit.*; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.server.NIOServerCnxn; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ServerStats; -import org.apache.zookeeper.test.ClientBase; - /** - * This unit test tests ledger recovery. + * This unit test tests ledger recovery. + * * - * */ -public class LedgerRecoveryTest -extends TestCase -implements Watcher { +public class LedgerRecoveryTest extends BaseTestCase { static Logger LOG = Logger.getLogger(LedgerRecoveryTest.class); - - BookieServer bs1, bs2, bs3; - File tmpDir1, tmpDir2, tmpDir3, tmpDirZK; - private static final String HOSTPORT = "127.0.0.1:33299"; - private NIOServerCnxn.Factory serverFactory; - - private static String BOOKIEADDR1 = "127.0.0.1:33300"; - private static String BOOKIEADDR2 = "127.0.0.1:33301"; - private static String BOOKIEADDR3 = "127.0.0.1:33302"; - - private static void recursiveDelete(File dir) { - File children[] = dir.listFiles(); - if (children != null) { - for(File child: children) { - recursiveDelete(child); - } - } - dir.delete(); - } - - protected void setUp() throws Exception { - /* - * Creates 3 BookieServers - */ - - - tmpDir1 = File.createTempFile("bookie1", "test"); - tmpDir1.delete(); - tmpDir1.mkdir(); - - final int PORT1 = Integer.parseInt(BOOKIEADDR1.split(":")[1]); - bs1 = new BookieServer(PORT1, tmpDir1, new File[] { tmpDir1 }); - bs1.start(); - - tmpDir2 = File.createTempFile("bookie2", "test"); - tmpDir2.delete(); - tmpDir2.mkdir(); - - final int PORT2 = Integer.parseInt(BOOKIEADDR2.split(":")[1]); - bs2 = new BookieServer(PORT2, tmpDir2, new File[] { tmpDir2 }); - bs2.start(); - - tmpDir3 = File.createTempFile("bookie3", "test"); - tmpDir3.delete(); - tmpDir3.mkdir(); - - final int PORT3 = Integer.parseInt(BOOKIEADDR3.split(":")[1]); - bs3 = new BookieServer(PORT3, tmpDir3, new File[] { tmpDir3 }); - bs3.start(); - - /* - * Instantiates a ZooKeeper server. This is a blind copy - * of setUp from SessionTest.java. - */ - LOG.info("STARTING " + getName()); - - //ServerStats.registerAsConcrete(); - tmpDirZK = ClientBase.createTmpDir(); + DigestType digestType; - ClientBase.setupTestEnv(); - ZooKeeperServer zs = new ZooKeeperServer(tmpDirZK, tmpDirZK, 3000); - - final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); - serverFactory = new NIOServerCnxn.Factory(PORT); - serverFactory.startup(zs); - - assertTrue("waiting for server up", - ClientBase.waitForServerUp(HOSTPORT, - CONNECTION_TIMEOUT)); - - /* - * Creating necessary znodes - */ - try{ - ZooKeeper zk = new ZooKeeper(HOSTPORT, 3000, this); - zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ); - zk.close(); - } catch (KeeperException ke) { - LOG.error(ke); - fail("Couldn't execute ZooKeeper start procedure"); - } - - } - - /** - * Watcher method. - */ - synchronized public void process(WatchedEvent event) { - LOG.info("Process: " + event.getType() + " " + event.getPath()); + public LedgerRecoveryTest(DigestType digestType) { + super(3); + this.digestType = digestType; } - - protected void tearDown() throws Exception { - LOG.info("### Tear down ###"); - bs1.shutdown(); - recursiveDelete(tmpDir1); - - bs2.shutdown(); - recursiveDelete(tmpDir2); - - bs3.shutdown(); - recursiveDelete(tmpDir3); - - serverFactory.shutdown(); - assertTrue("waiting for server down", - ClientBase.waitForServerDown(HOSTPORT, - CONNECTION_TIMEOUT)); - - //ServerStats.unregister(); - recursiveDelete(tmpDirZK); - LOG.info("FINISHED " + getName()); - } - - @Test - public void testLedgerRecovery(){ - /* - * Instantiate BookKeeper object. - */ - BookKeeper bk = null; - try{ - bk = new BookKeeper(HOSTPORT); - } catch (KeeperException ke){ - LOG.error("Error instantiating BookKeeper", ke); - fail("ZooKeeper error"); - } catch (IOException ioe){ - LOG.error(ioe); - fail("Failure due to IOException"); - } - + + private void testInternal(int numEntries) throws Exception { /* * Create ledger. */ LedgerHandle beforelh = null; - try{ - beforelh = bk.createLedger("".getBytes()); - } catch (KeeperException ke){ - LOG.error("Error creating a ledger", ke); - fail("ZooKeeper error"); - } catch (BKException bke){ - LOG.error("BookKeeper error"); - fail("BookKeeper error"); - } catch (InterruptedException ie) { - LOG.error(ie); - fail("Failure due to interrupted exception"); - } catch (IOException ioe) { - LOG.error(ioe); - fail("Failure due to IO exception"); + beforelh = bkc.createLedger(digestType, "".getBytes()); + + String tmp = "BookKeeper is cool!"; + for (int i = 0; i < numEntries; i++) { + beforelh.addEntry(tmp.getBytes()); } - + /* - * Write a 1000 entries. + * Try to open ledger. */ - try{ - String tmp = "BookKeeper is cool!"; - for(int i = 0; i < 1000; i++){ - beforelh.addEntry(tmp.getBytes()); - } - - //bk.resetLedger(beforelh); - } catch(InterruptedException e){ - LOG.error("Interrupted when adding entry", e); - fail("Couldn't finish adding entries"); - } catch(BKException e){ - LOG.error("BookKeeper exception", e); - fail("BookKeeper exception while adding entries"); - } - + LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, "".getBytes()); + /* - * Try to open ledger. + * Check if has recovered properly. */ - try{ - LedgerHandle afterlh = bk.openLedger(beforelh.getId(), "".getBytes()); - - /* - * Check if has recovered properly. - */ - assertTrue("Has not recovered correctly: " + afterlh.getLast(), afterlh.getLast() == 999); - } catch (KeeperException e) { - LOG.error("Error when opening ledger", e); - fail("Couldn't open ledger"); - } catch (InterruptedException ie) { - LOG.error("Interrupted exception", ie); - fail("Failure due to interrupted exception"); - } catch (IOException ioe) { - LOG.error("IO Exception", ioe); - fail("Failure due to IO exception"); - } catch (BKException bke){ - LOG.error("BookKeeper error", bke); - fail("BookKeeper error"); - } - + assertTrue("Has not recovered correctly: " + afterlh.getLastAddConfirmed(), + afterlh.getLastAddConfirmed() == numEntries - 1); } @Test - public void testEmptyLedgerRecovery(){ - /* - * Instantiate BookKeeper object. - */ - BookKeeper bk = null; - try{ - bk = new BookKeeper(HOSTPORT); - } catch (KeeperException ke){ - LOG.error("Error instantiating BookKeeper", ke); - fail("ZooKeeper error"); - } catch (IOException ioe){ - LOG.error(ioe); - fail("Failure due to IOException"); - } - - /* - * Create ledger. - */ - LedgerHandle beforelh = null; - try{ - beforelh = bk.createLedger("".getBytes()); - } catch (KeeperException ke){ - LOG.error("Error creating a ledger", ke); - fail("ZooKeeper error"); - } catch (BKException bke){ - LOG.error("BookKeeper error"); - fail("BookKeeper error"); - } catch (InterruptedException ie) { - LOG.error(ie); - fail("Failure due to interrupted exception"); - } catch (IOException ioe) { - LOG.error(ioe); - fail("Failure due to IO exception"); - } - - /* - * Write a 1 entry. - */ - try{ - String tmp = "BookKeeper is cool!"; - for(int i = 0; i < 1; i++){ - beforelh.addEntry(tmp.getBytes()); - } - } catch(InterruptedException e){ - LOG.error("Interrupted when adding entry", e); - fail("Couldn't finish adding entries"); - } catch(BKException e){ - LOG.error("BookKeeper exception", e); - fail("BookKeeper exception while adding entries"); - } - - - /* - * Try to open ledger. - */ - try{ - LedgerHandle afterlh = bk.openLedger(beforelh.getId(), "".getBytes()); - - /* - * Check if has recovered properly. - */ - assertTrue("Has not recovered correctly: " + afterlh.getLast(), afterlh.getLast() == 0); - } catch (KeeperException e) { - LOG.error("Error when opening ledger", e); - fail("Couldn't open ledger"); - } catch (InterruptedException ie) { - LOG.error("Interrupted exception", ie); - fail("Failure due to interrupted exception"); - } catch (IOException ioe) { - LOG.error("IO Exception", ioe); - fail("Failure due to IO exception"); - } catch (BKException bke){ - LOG.error("BookKeeper error", bke); - fail("BookKeeper error"); - } - + public void testLedgerRecovery() throws Exception { + testInternal(100); + } - + + @Test + public void testEmptyLedgerRecoveryOne() throws Exception{ + testInternal(1); + } + + @Test + public void testEmptyLedgerRecovery() throws Exception{ + testInternal(0); + } + } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java?rev=903483&r1=903482&r2=903483&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java Tue Jan 26 23:16:45 2010 @@ -1,4 +1,5 @@ package org.apache.bookkeeper.test; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,103 +21,97 @@ * */ - import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.io.IOException; import java.lang.InterruptedException; import java.util.Arrays; +import java.util.concurrent.Executors; import org.apache.bookkeeper.proto.BookieClient; -import org.apache.bookkeeper.proto.WriteCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.log4j.Logger; - +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; /** * This class tests BookieClient. It just sends the a new entry to itself. * * - * + * */ - class LoopbackClient implements WriteCallback { Logger LOG = Logger.getLogger(LoopbackClient.class); BookieClient client; static int recvTimeout = 2000; long begin = 0; int limit; - - + OrderedSafeExecutor executor; + static class Counter { int c; int limit; - - Counter(int limit){ + + Counter(int limit) { this.c = 0; this.limit = limit; } - - synchronized void increment(){ - if(++c == limit) + + synchronized void increment() { + if (++c == limit) this.notify(); } } - - LoopbackClient(int port, long begin, int limit) - throws IOException { - this.client = - new BookieClient(new InetSocketAddress("127.0.0.1", port), recvTimeout); + + LoopbackClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor, long begin, int limit) throws IOException { + this.client = new BookieClient(channelFactory, executor); this.begin = begin; } - - - void write(long ledgerId, long entry, byte[] data, WriteCallback cb, Object ctx) - throws IOException, InterruptedException { + + void write(long ledgerId, long entry, byte[] data, InetSocketAddress addr, WriteCallback cb, Object ctx) + throws IOException, InterruptedException { LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry); byte[] passwd = new byte[20]; Arrays.fill(passwd, (byte) 'a'); - - client.addEntry(ledgerId, - passwd, - entry, - ByteBuffer.wrap(data), - cb, - ctx); + + client.addEntry(addr, ledgerId, passwd, entry, ChannelBuffers.wrappedBuffer(data), cb, ctx); } - - public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){ + + public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) { Counter counter = (Counter) ctx; counter.increment(); } - - - public static void main(String args[]){ + + public static void main(String args[]) { byte[] data = new byte[Integer.parseInt(args[0])]; Integer limit = Integer.parseInt(args[1]); Counter c = new Counter(limit); long ledgerId = Long.valueOf("0").longValue(); long begin = System.currentTimeMillis(); - + LoopbackClient lb; - try{ - lb = new LoopbackClient(Integer.valueOf(args[2]).intValue(), - begin, - limit.intValue()); - - for(int i = 0; i < limit ; i++){ - lb.write(ledgerId, i, data, lb, c); + ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors + .newCachedThreadPool()); + OrderedSafeExecutor executor = new OrderedSafeExecutor(2); + try { + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", Integer.valueOf(args[2]).intValue()); + lb = new LoopbackClient(channelFactory, executor, begin, limit.intValue()); + + for (int i = 0; i < limit; i++) { + lb.write(ledgerId, i, data, addr, lb, c); } - - synchronized(c){ + + synchronized (c) { c.wait(); System.out.println("Time to write all entries: " + (System.currentTimeMillis() - begin)); } - } catch (IOException e){ + } catch (IOException e) { e.printStackTrace(); - } catch (InterruptedException e){ + } catch (InterruptedException e) { e.printStackTrace(); } - } - + } + } Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java?rev=903483&r1=903482&r2=903483&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java (original) +++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java Tue Jan 26 23:16:45 2010 @@ -1,4 +1,5 @@ package org.apache.bookkeeper.test; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,7 +21,6 @@ * */ - import java.net.Socket; import java.nio.ByteBuffer; @@ -29,7 +29,6 @@ import org.apache.bookkeeper.proto.NIOServerFactory.PacketProcessor; import org.junit.Test; - import junit.framework.TestCase; public class NIOServerFactoryTest extends TestCase { @@ -41,8 +40,9 @@ } src.sendResponse(new ByteBuffer[] { ByteBuffer.allocate(4) }); } - + }; + @Test public void testProblemProcessor() throws Exception { NIOServerFactory factory = new NIOServerFactory(22334, problemProcessor);