Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
 Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,275 +21,54 @@
  * 
  */
 
-
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
-
-import java.lang.InterruptedException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import junit.framework.TestCase;
-
 import org.junit.*;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.log4j.Logger;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ServerStats;
-import org.apache.zookeeper.test.ClientBase;
-
 /**
- * This unit test tests closing ledgers sequentially. 
- * It creates 4 ledgers, then write 1000 entries to each 
- * ledger and close it.
+ * This unit test tests closing ledgers sequentially. It creates 4 ledgers, 
then
+ * write 1000 entries to each ledger and close it.
  * 
  */
 
-public class CloseTest 
-extends TestCase 
-implements Watcher {
+public class CloseTest extends BaseTestCase{
     static Logger LOG = Logger.getLogger(LedgerRecoveryTest.class);
-    
-    BookieServer bs1, bs2, bs3;
-    File tmpDir1, tmpDir2, tmpDir3, tmpDirZK;
-    private static final String HOSTPORT = "127.0.0.1:33299";
-    private NIOServerCnxn.Factory serverFactory;
-    
-    private static String BOOKIEADDR1 = "127.0.0.1:33300";
-    private static String BOOKIEADDR2 = "127.0.0.1:33301";
-    private static String BOOKIEADDR3 = "127.0.0.1:33302";
-    
-    private static void recursiveDelete(File dir) {
-        File children[] = dir.listFiles();
-        if (children != null) {
-            for(File child: children) {
-                recursiveDelete(child);
-            }
-        }
-        dir.delete();
-    }
-    
-    protected void setUp() throws Exception {
-        /*
-         * Creates 3 BookieServers
-         */
-        
-        
-        tmpDir1 = File.createTempFile("bookie1", "test");
-        tmpDir1.delete();
-        tmpDir1.mkdir();
-        
-        final int PORT1 = Integer.parseInt(BOOKIEADDR1.split(":")[1]);
-        bs1 = new BookieServer(PORT1, tmpDir1, new File[] { tmpDir1 });
-        bs1.start();
-        
-        tmpDir2 = File.createTempFile("bookie2", "test");
-        tmpDir2.delete();
-        tmpDir2.mkdir();
-        
-        final int PORT2 = Integer.parseInt(BOOKIEADDR2.split(":")[1]);
-        bs2 = new BookieServer(PORT2, tmpDir2, new File[] { tmpDir2 });
-        bs2.start();
-        
-        tmpDir3 = File.createTempFile("bookie3", "test");
-        tmpDir3.delete();
-        tmpDir3.mkdir();
-        
-        final int PORT3 = Integer.parseInt(BOOKIEADDR3.split(":")[1]);
-        bs3 = new BookieServer(PORT3, tmpDir3, new File[] { tmpDir3 });
-        bs3.start();
-        
-        /*
-         * Instantiates a ZooKeeper server. This is a blind copy
-         * of setUp from SessionTest.java.
-         */
-        LOG.info("STARTING " + getName());
-
-        //ServerStats.registerAsConcrete();
-
-        tmpDirZK = ClientBase.createTmpDir();
+    DigestType digestType;
 
-        ClientBase.setupTestEnv();
-        ZooKeeperServer zs = new ZooKeeperServer(tmpDirZK, tmpDirZK, 3000);
-        
-        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        serverFactory = new NIOServerCnxn.Factory(PORT);
-        serverFactory.startup(zs);
-
-        assertTrue("waiting for server up",
-                   ClientBase.waitForServerUp(HOSTPORT,
-                                              CONNECTION_TIMEOUT));
-        
-        /*
-         * Creating necessary znodes
-         */
-        try{
-            ZooKeeper zk = new ZooKeeper(HOSTPORT, 3000, this);
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.close();
-        } catch (KeeperException ke) {
-            LOG.error(ke);
-            fail("Couldn't execute ZooKeeper start procedure");
-        }
-        
-    }
-    
-    /**
-     * Watcher method. 
-     */
-    synchronized public void process(WatchedEvent event) {
-        LOG.info("Process: " + event.getType() + " " + event.getPath());
-    }
-    
-    protected void tearDown() throws Exception {
-        LOG.info("### Tear down ###");
-        bs1.shutdown();
-        recursiveDelete(tmpDir1);
-        
-        bs2.shutdown();
-        recursiveDelete(tmpDir2);
-        
-        bs3.shutdown();
-        recursiveDelete(tmpDir3);
-        
-        serverFactory.shutdown();
-        assertTrue("waiting for server down",
-                   ClientBase.waitForServerDown(HOSTPORT,
-                                                CONNECTION_TIMEOUT));
-
-        //ServerStats.unregister();
-        recursiveDelete(tmpDirZK);
-        LOG.info("FINISHED " + getName());
+    public CloseTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
     }
 
     @Test
-    public void testClose(){
-        /*
-         * Instantiate BookKeeper object.
-         */
-        BookKeeper bk = null;
-        try{
-            bk = new BookKeeper(HOSTPORT);
-        } catch (KeeperException ke){
-            LOG.error("Error instantiating BookKeeper", ke);
-            fail("ZooKeeper error");
-        } catch (IOException ioe){
-            LOG.error(ioe);
-            fail("Failure due to IOException");
-        }
-        
+    public void testClose() throws Exception {
+
         /*
          * Create 4 ledgers.
          */
-        LedgerHandle lh1 = null;
-        LedgerHandle lh2 = null;
-        LedgerHandle lh3 = null;
-        LedgerHandle lh4 = null;
-        
-        try{
-            lh1 = bk.createLedger("".getBytes());
-            lh2 = bk.createLedger("".getBytes());
-            lh3 = bk.createLedger("".getBytes());
-            lh4 = bk.createLedger("".getBytes());
-        } catch (KeeperException ke){
-            LOG.error("Error creating a ledger", ke);
-            fail("ZooKeeper error");            
-        } catch (BKException bke){
-            LOG.error("BookKeeper error");
-            fail("BookKeeper error");
-        } catch (InterruptedException ie) {
-            LOG.error(ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error(ioe);
-            fail("Failure due to IO exception");
-        }
-        
-        /*
-         * Write a 1000 entries to lh1.
-         */
-        try{
-            String tmp = "BookKeeper is cool!";
-            for(int i = 0; i < 1000; i++){
-                lh1.addEntry(tmp.getBytes());
-            }
-        } catch(InterruptedException e){
-            LOG.error("Interrupted when adding entry", e);
-            fail("Couldn't finish adding entries");
-        } catch(BKException e){
-            LOG.error("BookKeeper exception", e);
-            fail("BookKeeper exception when adding entries");
-        }
-        
-        try{
-            lh1.close();
-        } catch(Exception e) {
-            LOG.error(e);
-            fail("Exception while closing ledger 1");
-        }
-        /*
-         * Write a 1000 entries to lh2.
-         */
-        try{
-            String tmp = "BookKeeper is cool!";
-            for(int i = 0; i < 1000; i++){
-                lh2.addEntry(tmp.getBytes());
-            }
-        } catch(InterruptedException e){
-            LOG.error("Interrupted when adding entry", e);
-            fail("Couldn't finish adding entries");
-        } catch(BKException e){
-            LOG.error("BookKeeper exception", e);
-            fail("CBookKeeper exception while adding entries");
-        }
-        
-        try{
-            lh2.close();
-        } catch(Exception e){
-            LOG.error(e);
-            fail("Exception while closing ledger 2");
+        int numLedgers = 4;
+        int numMsgs = 100;
+
+        LedgerHandle[] lh = new LedgerHandle[numLedgers];
+        for (int i = 0; i < numLedgers; i++) {
+            lh[i] = bkc.createLedger(digestType, "".getBytes());
         }
-        
+
+        String tmp = "BookKeeper is cool!";
+
         /*
-         * Write a 1000 entries to lh3 and lh4.
+         * Write 1000 entries to lh1.
          */
-        try{
-            String tmp = "BookKeeper is cool!";
-            for(int i = 0; i < 1000; i++){
-                lh3.addEntry(tmp.getBytes());
-                lh4.addEntry(tmp.getBytes());
+        for (int i = 0; i < numMsgs; i++) {
+            for (int j = 0; j < numLedgers; j++) {
+                lh[j].addEntry(tmp.getBytes());
             }
-        } catch(InterruptedException e){
-            LOG.error("Interrupted when adding entry", e);
-            fail("Couldn't finish adding entries");
-        } catch(BKException e){
-            LOG.error("BookKeeper exception", e);
-            fail("BookKeeper exception when adding entries");
         }
-        
-        try{
-            lh3.close();
-            lh4.close();
-        } catch(Exception e){
-            LOG.error(e);
-            fail("Exception while closing ledger 4");
+
+        for (int i = 0; i < numLedgers; i++) {
+
+            lh[i].close();
         }
-    }      
+    }
 }
-    
-    
\ No newline at end of file

Added: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java?rev=903483&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
 Tue Jan 26 23:16:45 2010
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests writing to concurrent ledgers
+ */
+public class ConcurrentLedgerTest extends TestCase {
+    Bookie bookie;
+    File txnDir, ledgerDir;
+    int recvTimeout = 10000;
+    Semaphore throttle;
+    
+    @Override
+    @Before
+    public void setUp() throws IOException {
+        String txnDirName = System.getProperty("txnDir");
+        if (txnDirName != null) {
+            txnDir = new File(txnDirName);
+        }
+        String ledgerDirName = System.getProperty("ledgerDir");
+        if (ledgerDirName != null) {
+            ledgerDir = new File(ledgerDirName);
+        }
+        File tmpFile = File.createTempFile("book", ".txn", txnDir);
+        tmpFile.delete();
+        txnDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir");
+        txnDir.mkdirs();
+        tmpFile = File.createTempFile("book", ".ledger", ledgerDir);
+        ledgerDir = new File(tmpFile.getParent(), tmpFile.getName()+".dir");
+        ledgerDir.mkdirs();
+        
+        bookie = new Bookie(txnDir, new File[] {ledgerDir});
+    }
+    
+    static void recursiveDelete(File f) {
+        if (f.isFile()) {
+            f.delete();
+        } else {
+            for(File i: f.listFiles()) {
+                recursiveDelete(i);
+            }
+            f.delete();
+        }
+    }
+    
+    @Override
+    @After
+    public void tearDown() {
+        try {
+            bookie.shutdown();
+            recursiveDelete(txnDir);
+            recursiveDelete(ledgerDir);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    byte zeros[] = new byte[16];
+
+    int iterations = 51;
+    {
+        String iterationsString = System.getProperty("iterations");
+        if (iterationsString != null) {
+            iterations = Integer.parseInt(iterationsString);
+        }
+    }
+    int iterationStep = 25;
+    {
+        String iterationsString = System.getProperty("iterationStep");
+        if (iterationsString != null) {
+            iterationStep = Integer.parseInt(iterationsString);
+        }
+    }
+    @Test
+    public void testConcurrentWrite() throws IOException, 
InterruptedException, BookieException {
+        int size = 1024;
+        int totalwrites = 128;
+        if (System.getProperty("totalwrites") != null) {
+            totalwrites = Integer.parseInt(System.getProperty("totalwrites"));
+        }
+        System.out.println("Running up to " + iterations + " iterations");
+        System.out.println("Total writes = " + totalwrites);
+        int ledgers;
+        for(ledgers = 1; ledgers <= iterations; ledgers += iterationStep) {
+            long duration = doWrites(ledgers, size, totalwrites);
+            System.out.println(totalwrites + " on " + ledgers + " took " + 
duration + " ms");
+        }
+        System.out.println("ledgers " + ledgers);
+        for(ledgers = 1; ledgers <= iterations; ledgers += iterationStep) {
+            long duration = doReads(ledgers, size, totalwrites);
+            System.out.println(ledgers + " read " + duration + " ms");
+        }
+    }
+
+    private long doReads(int ledgers, int size, int totalwrites)
+            throws IOException, InterruptedException, BookieException {
+        long start = System.currentTimeMillis();
+        for(int i = 1; i <= totalwrites/ledgers; i++) {
+            for(int j = 1; j <= ledgers; j++) {
+                ByteBuffer entry = bookie.readEntry(j, i);
+                // skip the ledger id and the entry id
+                entry.getLong();
+                entry.getLong();
+                assertEquals(j + "@" + i, j+2, entry.getLong());
+                assertEquals(j + "@" + i, i+3, entry.getLong());
+            }
+        }
+        long finish = System.currentTimeMillis();
+        return finish - start;
+    }
+    private long doWrites(int ledgers, int size, int totalwrites)
+            throws IOException, InterruptedException, BookieException {
+        throttle = new Semaphore(10000);
+        WriteCallback cb = new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId,
+                    InetSocketAddress addr, Object ctx) {
+                AtomicInteger counter = (AtomicInteger)ctx;
+                counter.getAndIncrement();
+                throttle.release();
+            }
+        };
+        AtomicInteger counter = new AtomicInteger();
+        long start = System.currentTimeMillis();
+        for(int i = 1; i <= totalwrites/ledgers; i++) {
+            for(int j = 1; j <= ledgers; j++) {
+                ByteBuffer bytes = ByteBuffer.allocate(size);
+                bytes.putLong(j);
+                bytes.putLong(i);
+                bytes.putLong(j+2);
+                bytes.putLong(i+3);
+                bytes.put(("This is ledger " + j + " entry " + i).getBytes());
+                bytes.position(0);
+                bytes.limit(bytes.capacity());
+                throttle.acquire();
+                bookie.addEntry(bytes, cb, counter, zeros);
+            }
+        }
+        long finish = System.currentTimeMillis();
+        return finish - start;
+    }
+}

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
 Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,318 +21,65 @@
  * 
  */
 
-
-import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
-
-import java.lang.InterruptedException;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import junit.framework.TestCase;
-
 import org.junit.*;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.log4j.Logger;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ServerStats;
-import org.apache.zookeeper.test.ClientBase;
-
 /**
- * This unit test tests ledger recovery. 
+ * This unit test tests ledger recovery.
+ * 
  * 
- *
  */
 
-public class LedgerRecoveryTest 
-extends TestCase 
-implements Watcher {
+public class LedgerRecoveryTest extends BaseTestCase {
     static Logger LOG = Logger.getLogger(LedgerRecoveryTest.class);
-    
-    BookieServer bs1, bs2, bs3;
-    File tmpDir1, tmpDir2, tmpDir3, tmpDirZK;
-    private static final String HOSTPORT = "127.0.0.1:33299";
-    private NIOServerCnxn.Factory serverFactory;
-    
-    private static String BOOKIEADDR1 = "127.0.0.1:33300";
-    private static String BOOKIEADDR2 = "127.0.0.1:33301";
-    private static String BOOKIEADDR3 = "127.0.0.1:33302";
-    
-    private static void recursiveDelete(File dir) {
-        File children[] = dir.listFiles();
-        if (children != null) {
-            for(File child: children) {
-                recursiveDelete(child);
-            }
-        }
-        dir.delete();
-    }
-    
-    protected void setUp() throws Exception {
-        /*
-         * Creates 3 BookieServers
-         */
-        
-        
-        tmpDir1 = File.createTempFile("bookie1", "test");
-        tmpDir1.delete();
-        tmpDir1.mkdir();
-        
-        final int PORT1 = Integer.parseInt(BOOKIEADDR1.split(":")[1]);
-        bs1 = new BookieServer(PORT1, tmpDir1, new File[] { tmpDir1 });
-        bs1.start();
-        
-        tmpDir2 = File.createTempFile("bookie2", "test");
-        tmpDir2.delete();
-        tmpDir2.mkdir();
-        
-        final int PORT2 = Integer.parseInt(BOOKIEADDR2.split(":")[1]);
-        bs2 = new BookieServer(PORT2, tmpDir2, new File[] { tmpDir2 });
-        bs2.start();
-        
-        tmpDir3 = File.createTempFile("bookie3", "test");
-        tmpDir3.delete();
-        tmpDir3.mkdir();
-        
-        final int PORT3 = Integer.parseInt(BOOKIEADDR3.split(":")[1]);
-        bs3 = new BookieServer(PORT3, tmpDir3, new File[] { tmpDir3 });
-        bs3.start();
-        
-        /*
-         * Instantiates a ZooKeeper server. This is a blind copy
-         * of setUp from SessionTest.java.
-         */
-        LOG.info("STARTING " + getName());
-
-        //ServerStats.registerAsConcrete();
 
-        tmpDirZK = ClientBase.createTmpDir();
+    DigestType digestType;
 
-        ClientBase.setupTestEnv();
-        ZooKeeperServer zs = new ZooKeeperServer(tmpDirZK, tmpDirZK, 3000);
-        
-        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
-        serverFactory = new NIOServerCnxn.Factory(PORT);
-        serverFactory.startup(zs);
-
-        assertTrue("waiting for server up",
-                   ClientBase.waitForServerUp(HOSTPORT,
-                                              CONNECTION_TIMEOUT));
-        
-        /*
-         * Creating necessary znodes
-         */
-        try{
-            ZooKeeper zk = new ZooKeeper(HOSTPORT, 3000, this);
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-            zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
-            zk.close();
-        } catch (KeeperException ke) {
-            LOG.error(ke);
-            fail("Couldn't execute ZooKeeper start procedure");
-        }
-        
-    }
-    
-    /**
-     * Watcher method. 
-     */
-    synchronized public void process(WatchedEvent event) {
-        LOG.info("Process: " + event.getType() + " " + event.getPath());
+    public LedgerRecoveryTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
     }
-    
-    protected void tearDown() throws Exception {
-        LOG.info("### Tear down ###");
-        bs1.shutdown();
-        recursiveDelete(tmpDir1);
-        
-        bs2.shutdown();
-        recursiveDelete(tmpDir2);
-        
-        bs3.shutdown();
-        recursiveDelete(tmpDir3);
-        
-        serverFactory.shutdown();
-        assertTrue("waiting for server down",
-                   ClientBase.waitForServerDown(HOSTPORT,
-                                                CONNECTION_TIMEOUT));
-
-        //ServerStats.unregister();
-        recursiveDelete(tmpDirZK);
-        LOG.info("FINISHED " + getName());
-    }
-    
-    @Test
-    public void testLedgerRecovery(){
-        /*
-         * Instantiate BookKeeper object.
-         */
-        BookKeeper bk = null;
-        try{
-            bk = new BookKeeper(HOSTPORT);
-        } catch (KeeperException ke){
-            LOG.error("Error instantiating BookKeeper", ke);
-            fail("ZooKeeper error");
-        } catch (IOException ioe){
-            LOG.error(ioe);
-            fail("Failure due to IOException");
-        }
-        
+
+    private void testInternal(int numEntries) throws Exception {
         /*
          * Create ledger.
          */
         LedgerHandle beforelh = null;
-        try{
-            beforelh = bk.createLedger("".getBytes());
-        } catch (KeeperException ke){
-            LOG.error("Error creating a ledger", ke);
-            fail("ZooKeeper error");            
-        } catch (BKException bke){
-            LOG.error("BookKeeper error");
-            fail("BookKeeper error");
-        } catch (InterruptedException ie) {
-            LOG.error(ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error(ioe);
-            fail("Failure due to IO exception");
+        beforelh = bkc.createLedger(digestType, "".getBytes());
+
+        String tmp = "BookKeeper is cool!";
+        for (int i = 0; i < numEntries; i++) {
+            beforelh.addEntry(tmp.getBytes());
         }
-        
+
         /*
-         * Write a 1000 entries.
+         * Try to open ledger.
          */
-        try{
-            String tmp = "BookKeeper is cool!";
-            for(int i = 0; i < 1000; i++){
-                beforelh.addEntry(tmp.getBytes());
-            }
-            
-            //bk.resetLedger(beforelh);
-        } catch(InterruptedException e){
-            LOG.error("Interrupted when adding entry", e);
-            fail("Couldn't finish adding entries");
-        } catch(BKException e){
-            LOG.error("BookKeeper exception", e);
-            fail("BookKeeper exception while adding entries");
-        }
-        
+        LedgerHandle afterlh = bkc.openLedger(beforelh.getId(), digestType, 
"".getBytes());
+
         /*
-         * Try to open ledger.
+         * Check if has recovered properly.
          */
-        try{
-            LedgerHandle afterlh = bk.openLedger(beforelh.getId(), 
"".getBytes());
-            
-            /*
-             * Check if has recovered properly.
-             */
-            assertTrue("Has not recovered correctly: " + afterlh.getLast(), 
afterlh.getLast() == 999);
-        } catch (KeeperException e) {
-            LOG.error("Error when opening ledger", e);
-            fail("Couldn't open ledger");
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted exception", ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error("IO Exception", ioe);
-            fail("Failure due to IO exception");
-        } catch (BKException bke){
-            LOG.error("BookKeeper error", bke);
-            fail("BookKeeper error");
-        }
-        
+        assertTrue("Has not recovered correctly: " + 
afterlh.getLastAddConfirmed(),
+                afterlh.getLastAddConfirmed() == numEntries - 1);        
     }
     
     @Test
-    public void testEmptyLedgerRecovery(){
-        /*
-         * Instantiate BookKeeper object.
-         */
-        BookKeeper bk = null;
-        try{
-            bk = new BookKeeper(HOSTPORT);
-        } catch (KeeperException ke){
-            LOG.error("Error instantiating BookKeeper", ke);
-            fail("ZooKeeper error");
-        } catch (IOException ioe){
-            LOG.error(ioe);
-            fail("Failure due to IOException");
-        }
-        
-        /*
-         * Create ledger.
-         */
-        LedgerHandle beforelh = null;
-        try{
-            beforelh = bk.createLedger("".getBytes());
-        } catch (KeeperException ke){
-            LOG.error("Error creating a ledger", ke);
-            fail("ZooKeeper error");            
-        } catch (BKException bke){
-            LOG.error("BookKeeper error");
-            fail("BookKeeper error");
-        } catch (InterruptedException ie) {
-            LOG.error(ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error(ioe);
-            fail("Failure due to IO exception");
-        }
-        
-        /*
-         * Write a 1 entry.
-         */
-        try{
-            String tmp = "BookKeeper is cool!";
-            for(int i = 0; i < 1; i++){
-                beforelh.addEntry(tmp.getBytes());
-            }
-        } catch(InterruptedException e){
-            LOG.error("Interrupted when adding entry", e);
-            fail("Couldn't finish adding entries");
-        } catch(BKException e){
-            LOG.error("BookKeeper exception", e);
-            fail("BookKeeper exception while adding entries");
-        }
-        
-        
-        /*
-         * Try to open ledger.
-         */
-        try{
-            LedgerHandle afterlh = bk.openLedger(beforelh.getId(), 
"".getBytes());
-            
-            /*
-             * Check if has recovered properly.
-             */
-            assertTrue("Has not recovered correctly: " + afterlh.getLast(), 
afterlh.getLast() == 0);
-        } catch (KeeperException e) {
-            LOG.error("Error when opening ledger", e);
-            fail("Couldn't open ledger");
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted exception", ie);
-            fail("Failure due to interrupted exception");
-        } catch (IOException ioe) {
-            LOG.error("IO Exception", ioe);
-            fail("Failure due to IO exception");
-        } catch (BKException bke){
-            LOG.error("BookKeeper error", bke);
-            fail("BookKeeper error");
-        }
-        
+    public void testLedgerRecovery() throws Exception {
+        testInternal(100);
+     
     }
-    
+
+    @Test
+    public void testEmptyLedgerRecoveryOne() throws Exception{
+        testInternal(1);
+    }
+
+    @Test
+    public void testEmptyLedgerRecovery() throws Exception{
+        testInternal(0);
+    }
+
 }

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
 Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,103 +21,97 @@
  * 
  */
 
-
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.io.IOException;
 import java.lang.InterruptedException;
 import java.util.Arrays;
+import java.util.concurrent.Executors;
 
 import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.log4j.Logger;
-
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 /**
  * This class tests BookieClient. It just sends the a new entry to itself.
  * 
  * 
- *
+ * 
  */
 
-
 class LoopbackClient implements WriteCallback {
     Logger LOG = Logger.getLogger(LoopbackClient.class);
     BookieClient client;
     static int recvTimeout = 2000;
     long begin = 0;
     int limit;
-    
-    
+    OrderedSafeExecutor executor;
+
     static class Counter {
         int c;
         int limit;
-        
-        Counter(int limit){
+
+        Counter(int limit) {
             this.c = 0;
             this.limit = limit;
         }
-        
-        synchronized void increment(){
-            if(++c == limit) 
+
+        synchronized void increment() {
+            if (++c == limit)
                 this.notify();
         }
     }
-    
-    LoopbackClient(int port, long begin, int limit)
-    throws IOException {
-        this.client = 
-            new BookieClient(new InetSocketAddress("127.0.0.1", port), 
recvTimeout);
+
+    LoopbackClient(ClientSocketChannelFactory channelFactory, 
OrderedSafeExecutor executor, long begin, int limit) throws IOException {
+        this.client = new BookieClient(channelFactory, executor);
         this.begin = begin;
     }
-    
-    
-    void write(long ledgerId, long entry, byte[] data, WriteCallback cb, 
Object ctx)
-    throws IOException, InterruptedException {
+
+    void write(long ledgerId, long entry, byte[] data, InetSocketAddress addr, 
WriteCallback cb, Object ctx)
+            throws IOException, InterruptedException {
         LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
         byte[] passwd = new byte[20];
         Arrays.fill(passwd, (byte) 'a');
-        
-        client.addEntry(ledgerId, 
-            passwd,
-            entry, 
-            ByteBuffer.wrap(data), 
-            cb,
-            ctx);
+
+        client.addEntry(addr, ledgerId, passwd, entry, 
ChannelBuffers.wrappedBuffer(data), cb, ctx);
     }
-    
-    public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){
+
+    public void writeComplete(int rc, long ledgerId, long entryId, 
InetSocketAddress addr, Object ctx) {
         Counter counter = (Counter) ctx;
         counter.increment();
     }
-    
-    
-    public static void main(String args[]){
+
+    public static void main(String args[]) {
         byte[] data = new byte[Integer.parseInt(args[0])];
         Integer limit = Integer.parseInt(args[1]);
         Counter c = new Counter(limit);
         long ledgerId = Long.valueOf("0").longValue();
         long begin = System.currentTimeMillis();
-        
+
         LoopbackClient lb;
-        try{
-            lb = new LoopbackClient(Integer.valueOf(args[2]).intValue(), 
-                    begin, 
-                    limit.intValue());
-        
-            for(int i = 0; i < limit ; i++){
-                lb.write(ledgerId, i, data, lb, c);   
+        ClientSocketChannelFactory channelFactory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                .newCachedThreadPool());
+        OrderedSafeExecutor executor = new OrderedSafeExecutor(2);
+        try {
+            InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 
Integer.valueOf(args[2]).intValue());
+            lb = new LoopbackClient(channelFactory, executor, begin, 
limit.intValue());
+
+            for (int i = 0; i < limit; i++) {
+                lb.write(ledgerId, i, data, addr, lb, c);
             }
-            
-            synchronized(c){
+
+            synchronized (c) {
                 c.wait();
                 System.out.println("Time to write all entries: " + 
(System.currentTimeMillis() - begin));
             }
-        } catch (IOException e){
+        } catch (IOException e) {
             e.printStackTrace();
-        } catch (InterruptedException e){
+        } catch (InterruptedException e) {
             e.printStackTrace();
         }
-    } 
-    
+    }
+
 }

Modified: 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java
 Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.test;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,7 +21,6 @@
  * 
  */
 
-
 import java.net.Socket;
 import java.nio.ByteBuffer;
 
@@ -29,7 +29,6 @@
 import org.apache.bookkeeper.proto.NIOServerFactory.PacketProcessor;
 import org.junit.Test;
 
-
 import junit.framework.TestCase;
 
 public class NIOServerFactoryTest extends TestCase {
@@ -41,8 +40,9 @@
             }
             src.sendResponse(new ByteBuffer[] { ByteBuffer.allocate(4) });
         }
-        
+
     };
+
     @Test
     public void testProblemProcessor() throws Exception {
         NIOServerFactory factory = new NIOServerFactory(22334, 
problemProcessor);


Reply via email to