Author: mahadev
Date: Fri Feb 20 00:23:40 2009
New Revision: 746067

URL: http://svn.apache.org/viewvc?rev=746067&view=rev
Log:
ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
    
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
    
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Feb 20 00:23:40 2009
@@ -10,6 +10,8 @@
   ZOOKEEPER-303. Bin scripts dont work on a Mac. (tom white via mahadev)
  
 IMPROVEMENTS:
+  ZOOKEEPER-308. improve the atomic broadcast performance 3x. (breed via
+mahadev)
 
 NEW FEATURES:
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 Fri Feb 20 00:23:40 2009
@@ -88,7 +88,10 @@
                             + zks.outstandingChanges.get(0).zxid
                             + " is less than current " + request.zxid);
                 }
-                zks.outstandingChanges.remove(0);
+                ZooKeeperServer.ChangeRecord cr = 
zks.outstandingChanges.remove(0);
+                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
+                    zks.outstandingChangesForPath.remove(cr.path);
+                }
             }
             if (request.hdr != null) {
                 rc = zks.dataTree.processTxn(request.hdr, request.txn);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Fri Feb 20 00:23:40 2009
@@ -91,6 +91,7 @@
             super("NIOServerCxn.Factory:" + port);
             setDaemon(true);
             this.ss = ServerSocketChannel.open();
+            ss.socket().setReuseAddress(true);
             ss.socket().bind(new InetSocketAddress(port));
             ss.configureBlocking(false);
             ss.register(selector, SelectionKey.OP_ACCEPT);
@@ -268,6 +269,19 @@
     LinkedList<Request> outstanding = new LinkedList<Request>();
 
     void sendBuffer(ByteBuffer bb) {
+        // We check if write interest here because if it is NOT set, nothing 
is queued, so
+        // we can try to send the buffer right away without waking up the 
selector
+        if ((sk.interestOps()&SelectionKey.OP_WRITE) == 0) {
+            try {
+                sock.write(bb);
+            } catch (IOException e) {
+                // we are just doing best effort right now
+            }
+        }
+        // if there is nothing left to send, we are done
+        if (bb.remaining() == 0) {
+            return;
+        }
         synchronized (factory) {
             sk.selector().wakeup();
             if (LOG.isTraceEnabled()) {
@@ -471,7 +485,7 @@
                 outstandingRequests++;
                 // check throttling
                 if (zk.getInProcess() > factory.outstandingLimit) {
-                    LOG.warn("Throttling recv " + zk.getInProcess());
+                    LOG.debug("Throttling recv " + zk.getInProcess());
                     disableRecv();
                     // following lines should not be needed since we are 
already
                     // reading

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
 Fri Feb 20 00:23:40 2009
@@ -114,12 +114,15 @@
     ChangeRecord getRecordForPath(String path) throws 
KeeperException.NoNodeException {
         ChangeRecord lastChange = null;
         synchronized (zks.outstandingChanges) {
+            lastChange = zks.outstandingChangesForPath.get(path);
+            /*
             for (int i = 0; i < zks.outstandingChanges.size(); i++) {
                 ChangeRecord c = zks.outstandingChanges.get(i);
                 if (c.path.equals(path)) {
                     lastChange = c;
                 }
             }
+            */
             if (lastChange == null) {
                 DataNode n = zks.dataTree.getNode(path);
                 if (n != null) {
@@ -137,6 +140,7 @@
     void addChangeRecord(ChangeRecord c) {
         synchronized (zks.outstandingChanges) {
             zks.outstandingChanges.add(c);
+            zks.outstandingChangesForPath.put(c.path, c);
         }
     }
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 Fri Feb 20 00:23:40 2009
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server;
 
+import java.io.Flushable;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Random;
@@ -65,6 +66,7 @@
     @Override
     public void run() {
         try {
+            int randRoll = r.nextInt(snapCount/2);
             while (true) {
                 Request si = null;
                 if (toFlush.isEmpty()) {
@@ -82,8 +84,8 @@
                 if (si != null) {
                     zks.getLogWriter().append(si);
                         logCount++;
-                        if (logCount > snapCount / 2
-                                && r.nextInt(snapCount / 2) == 0) {
+                        if (logCount > (snapCount / 2 + randRoll)) {
+                            randRoll = r.nextInt(snapCount/2);
                             // roll the log
                             zks.getLogWriter().rollLog();
                             // take a snapshot
@@ -126,6 +128,9 @@
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);
         }
+        if (nextProcessor instanceof Flushable) {
+            ((Flushable)nextProcessor).flush();
+        }
     }
 
     public void shutdown() {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
 Fri Feb 20 00:23:40 2009
@@ -24,6 +24,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -118,6 +119,9 @@
 
     int requestsInProcess;
     List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
+    // this data structure must be accessed under the outstandingChanges lock
+    HashMap<String, ChangeRecord> outstandingChangesForPath = new 
HashMap<String, ChangeRecord>();
+    
     private NIOServerCnxn.Factory serverCnxnFactory;
 
     private final ServerStats serverStats;
@@ -571,7 +575,7 @@
         try {
             return Integer.parseInt(sc);
         } catch (Exception e) {
-            return 10000;
+            return 100000;
         }
     }
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
 Fri Feb 20 00:23:40 2009
@@ -18,6 +18,7 @@
 package org.apache.zookeeper.server.persistence;
 
 import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.File;
@@ -49,8 +50,10 @@
  */
 public class FileTxnLog implements TxnLog {
     long lastZxidSeen;
-    volatile FileOutputStream logStream = null;
+    volatile BufferedOutputStream logStream = null;
     volatile OutputArchive oa;
+    volatile FileOutputStream fos = null;
+    
     
     File logDir;
     public final static int TXNLOG_MAGIC =
@@ -104,10 +107,14 @@
 
     /**
      * rollover the current log file to a new one.
+     * @throws IOException 
      */
-    public void rollLog() {
-        this.logStream = null;
-        oa = null;
+    public void rollLog() throws IOException {
+        if (logStream != null) {
+            this.logStream.flush();
+            this.logStream = null;
+            oa = null;
+        }
     }
 
     /**
@@ -123,17 +130,18 @@
                         + " is <= " + lastZxidSeen + " for "
                         + hdr.getType());
             }
-           if (logStream==null) {
+            if (logStream==null) {
                logFileWrite = new File(logDir, ("log." + 
                        Long.toHexString(hdr.getZxid())));
-               logStream=new FileOutputStream(logFileWrite);
+               fos = new FileOutputStream(logFileWrite);
+               logStream=new BufferedOutputStream(fos);
                oa = BinaryOutputArchive.getArchive(logStream);
                FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
                fhdr.serialize(oa, "fileheader");
-               currentSize = logStream.getChannel().position();
-               streamsToFlush.add(logStream);
+               currentSize = fos.getChannel().position();
+               streamsToFlush.add(fos);
             }
-            padFile(logStream);
+            padFile(fos);
             byte[] buf = Util.marshallTxnEntry(hdr, txn);
             if (buf == null || buf.length == 0) {
                 throw new IOException("Faulty serialization for header " +
@@ -229,6 +237,9 @@
      * disk
      */
     public synchronized void commit() throws IOException {
+        if (logStream != null) {
+            logStream.flush();
+        }
         for (FileOutputStream log : streamsToFlush) {
             log.flush();
             if (forceSync) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
 Fri Feb 20 00:23:40 2009
@@ -268,8 +268,9 @@
 
     /**
      * roll the transaction logs
+     * @throws IOException 
      */
-    public void rollLog() {
+    public void rollLog() throws IOException {
         txnLog.rollLog();
     }
     

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
 Fri Feb 20 00:23:40 2009
@@ -32,8 +32,9 @@
     /**
      * roll the current
      * log being appended to
+     * @throws IOException 
      */
-    void rollLog();
+    void rollLog() throws IOException;
     /**
      * Append a request to the transaction log
      * @param hdr the transaction header

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 Fri Feb 20 00:23:40 2009
@@ -180,7 +180,9 @@
             queuedRequests.clear();
             notifyAll();
         }
-        nextProcessor.shutdown();
+        if (nextProcessor != null) {
+            nextProcessor.shutdown();
+        }
     }
 
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
 Fri Feb 20 00:23:40 2009
@@ -43,6 +43,7 @@
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -51,6 +52,11 @@
 public class Follower {
     private static final Logger LOG = Logger.getLogger(Follower.class);
 
+    static final private boolean nodelay = 
System.getProperty("follower.nodelay", "true").equals("true");
+    static {
+        LOG.info("TCP NoDelay set to: " + nodelay);
+    }
+
     QuorumPeer self;
 
     FollowerZooKeeperServer zk;
@@ -85,15 +91,14 @@
      *                the proposal packet to be sent to the leader
      * @throws IOException
      */
-    void writePacket(QuorumPacket pp) throws IOException {
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        if (pp.getType() == Leader.PING) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
+    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
         synchronized (leaderOs) {
-            leaderOs.writeRecord(pp, "packet");
-            bufferedOutput.flush();
+            if (pp != null) {
+                leaderOs.writeRecord(pp, "packet");
+            }
+            if (flush) {
+                bufferedOutput.flush();
+            }
         }
     }
 
@@ -147,7 +152,7 @@
                         //sock = new Socket();
                         //sock.setSoTimeout(self.tickTime * self.initLimit);
                         sock.connect(addr, self.tickTime * self.syncLimit);
-                        sock.setTcpNoDelay(true);
+                        sock.setTcpNoDelay(nodelay);
                         break;
                     } catch (ConnectException e) {
                         if (tries == 4) {
@@ -169,7 +174,7 @@
                 qp.setType(Leader.LASTZXID);
                 long sentLastZxid = self.getLastLoggedZxid();
                 qp.setZxid(sentLastZxid);
-                writePacket(qp);
+                writePacket(qp, true);
                 readPacket(qp);
                 long newLeaderZxid = qp.getZxid();
     
@@ -214,7 +219,7 @@
                     zk.dataTree.lastProcessedZxid = newLeaderZxid;
                 }
                 ack.setZxid(newLeaderZxid & ~0xffffffffL);
-                writePacket(ack);
+                writePacket(ack, true);
                 sock.setSoTimeout(self.tickTime * self.syncLimit);
                 zk.startup();
                 while (self.running) {
@@ -231,7 +236,7 @@
                             dos.writeInt(entry.getValue());
                         }
                         qp.setData(bos.toByteArray());
-                        writePacket(qp);
+                        writePacket(qp, true);
                         break;
                     case Leader.PROPOSAL:
                         TxnHeader hdr = new TxnHeader();
@@ -334,7 +339,7 @@
                                  ZooTrace.SESSION_TRACE_MASK,
                                  "To validate session 0x"
                                  + Long.toHexString(clientId));
-        writePacket(qp);
+        writePacket(qp, true);
     }
 
     /**
@@ -370,7 +375,7 @@
 //        qp = new QuorumPacket(Leader.REQUEST, -1, baos
 //                .toByteArray(), request.authInfo);
 //        }
-        writePacket(qp);
+        writePacket(qp, true);
     }
 
     public long getZxid() {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 Fri Feb 20 00:23:40 2009
@@ -97,20 +97,23 @@
     private void sendPackets() throws InterruptedException {
         long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
         while (true) {
-            QuorumPacket p;
-            p = queuedPackets.take();
-
-            if (p == proposalOfDeath) {
-                // Packet of death!
-                break;
-            }
-            if (p.getType() == Leader.PING) {
-                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-            }
-            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
             try {
+                QuorumPacket p;
+                p = queuedPackets.poll();
+                if (p == null) {
+                    bufferedOutput.flush();
+                    p = queuedPackets.take();
+                }
+
+                if (p == proposalOfDeath) {
+                    // Packet of death!
+                    break;
+                }
+                if (p.getType() == Leader.PING) {
+                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+                }
+                ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                 oa.writeRecord(p, "packet");
-                bufferedOutput.flush();
             } catch (IOException e) {
                 if (!sock.isClosed()) {
                     LOG.warn("Unexpected exception",e);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 Fri Feb 20 00:23:40 2009
@@ -26,11 +26,14 @@
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.log4j.Logger;
@@ -43,6 +46,11 @@
  */
 public class Leader {
     private static final Logger LOG = Logger.getLogger(Leader.class);
+    
+    static final private boolean nodelay = 
System.getProperty("leader.nodelay", "true").equals("true");
+    static {
+        LOG.info("TCP NoDelay set to: " + nodelay);
+    }
 
     static public class Proposal {
         public QuorumPacket packet;
@@ -191,7 +199,7 @@
      */
     final static int SYNC = 7;
      
-    private ConcurrentLinkedQueue<Proposal> outstandingProposals = new 
ConcurrentLinkedQueue<Proposal>();
+    private ConcurrentMap<Long, Proposal> outstandingProposals = new 
ConcurrentHashMap<Long, Proposal>();
 
     ConcurrentLinkedQueue<Proposal> toBeApplied = new 
ConcurrentLinkedQueue<Proposal>();
 
@@ -207,7 +215,7 @@
                     try{
                         Socket s = ss.accept();
                         s.setSoTimeout(self.tickTime * self.syncLimit);
-                        s.setTcpNoDelay(true);
+                        s.setTcpNoDelay(nodelay);
                         new FollowerHandler(s, Leader.this);
                     } catch (SocketException e) {
                         if (stop) {
@@ -257,7 +265,7 @@
                 LOG.info("NEWLEADER proposal has Zxid of "
                         + 
Long.toHexString(newLeaderProposal.packet.getZxid()));
             }
-            outstandingProposals.add(newLeaderProposal);
+            outstandingProposals.put(newLeaderProposal.packet.getZxid(), 
newLeaderProposal);
             
             // Start thread that waits for connection requests from 
             // new followers.
@@ -341,7 +349,9 @@
         LOG.info("Shutdown called",
                 new Exception("shutdown Leader! reason: " + reason));
 
-        cnxAcceptor.halt();
+        if (cnxAcceptor != null) {
+            cnxAcceptor.halt();
+        }
         
         // NIO should not accept conenctions
         self.cnxnFactory.setZooKeeperServer(null);
@@ -380,7 +390,7 @@
         
         if (LOG.isDebugEnabled()) {
             LOG.debug("Ack zxid: 0x" + Long.toHexString(zxid));
-            for (Proposal p : outstandingProposals) {
+            for (Proposal p : outstandingProposals.values()) {
                 long packetZxid = p.packet.getZxid();
                 LOG.debug("outstanding proposal: 0x"
                         + Long.toHexString(packetZxid));
@@ -394,57 +404,56 @@
             }
             return;
         }
-        if (outstandingProposals.peek().packet.getZxid() > zxid) {
+        if (lastCommitted >= zxid) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("proposal has already been committed, pzxid:"
-                        + outstandingProposals.peek().packet.getZxid()
-                        + " zxid:" + zxid);
+                        + lastCommitted
+                        + " zxid: 0x" + Long.toHexString(zxid));
             }
             // The proposal has already been committed
             return;
         }
-        for (Proposal p : outstandingProposals) {
-            long packetZxid = p.packet.getZxid();
-            if (packetZxid == zxid) {
-                p.ackCount++;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
-                            + " is " + p.ackCount);
-                }
+        Proposal p = outstandingProposals.get(zxid);
+        if (p == null) {
+            LOG.warn("Trying to commit future proposal: zxid 0x"
+                    + Long.toHexString(zxid) + " from " + followerAddr);
+            return;
+        }
+        p.ackCount++;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
+                    + " is " + p.ackCount);
+        }
                 
-                if (p.ackCount > self.quorumPeers.size() / 2){
-                    if (!first) {
-                        LOG.fatal("Commiting zxid 0x" + Long.toHexString(zxid)
-                                + " from " + followerAddr + " not first!");
-                        LOG.fatal("First is "
-                                + outstandingProposals.element().packet);
-                        System.exit(13);
-                    }
-                    outstandingProposals.remove();
-                    if (p.request != null) {
-                        toBeApplied.add(p);
-                    }
-                    // We don't commit the new leader proposal
-                    if ((zxid & 0xffffffffL) != 0) {
-                        if (p.request == null) {
-                            LOG.warn("Going to commmit null: " + p);
-                        }
-                        commit(zxid);
-                        zk.commitProcessor.commit(p.request);
-                        if(pendingSyncs.containsKey(zxid)){
-                           for(FollowerSyncRequest r: 
pendingSyncs.remove(zxid)) {
-                               sendSync(r);
-                        }
-                    }
+        if (p.ackCount > self.quorumPeers.size() / 2){
+            if (zxid != lastCommitted+1) {
+                LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+                        + " from " + followerAddr + " not first!");
+                LOG.warn("First is "
+                        + (lastCommitted+1));
+                //System.exit(13);
+            }
+            outstandingProposals.remove(zxid);
+            if (p.request != null) {
+                toBeApplied.add(p);
+            }
+            // We don't commit the new leader proposal
+            if ((zxid & 0xffffffffL) != 0) {
+                if (p.request == null) {
+                    LOG.warn("Going to commmit null: " + p);
                 }
+                commit(zxid);
+                zk.commitProcessor.commit(p.request);
+                if(pendingSyncs.containsKey(zxid)){
+                    for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
+                        sendSync(r);
+                    }
                 }
                 return;
             } else {
-                first = false;
+                lastCommitted = zxid;
             }
         }
-        LOG.warn("Trying to commit future proposal: zxid 0x"
-                + Long.toHexString(zxid) + " from " + followerAddr);
     }
 
     static class ToBeAppliedRequestProcessor implements RequestProcessor {
@@ -514,7 +523,7 @@
         }
     }
 
-    long lastCommitted;
+    long lastCommitted = -1;
 
     /**
      * Create a commit packet and send it to all the members of the quorum
@@ -537,7 +546,6 @@
      */
     public Proposal propose(Request request) {
         
-        
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
         try {
@@ -549,8 +557,8 @@
         } catch (IOException e) {
             LOG.warn("This really should be impossible", e);
         }
-        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos
-                .toByteArray(), null);
+        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
+                baos.toByteArray(), null);
         
         Proposal p = new Proposal();
         p.packet = pp;
@@ -560,8 +568,8 @@
                 LOG.debug("Proposing:: " + request);
             }
 
-            outstandingProposals.add(p);
             lastProposed = p.packet.getZxid();
+            outstandingProposals.put(lastProposed, p);
             sendPacket(pp);
         }
         return p;
@@ -622,11 +630,13 @@
                         .getZxid(), null, null);
                 handler.queuePacket(qp);
             }
-            for (Proposal p : outstandingProposals) {
-                if (p.packet.getZxid() <= lastSeenZxid) {
+            List<Long>zxids = new 
ArrayList<Long>(outstandingProposals.keySet());
+            Collections.sort(zxids);
+            for (Long zxid: zxids) {
+                if (zxid <= lastSeenZxid) {
                     continue;
                 }
-                handler.queuePacket(p.packet);
+                handler.queuePacket(outstandingProposals.get(zxid).packet);
             }
         }
         synchronized (forwardingFollowers) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
 Fri Feb 20 00:23:40 2009
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.Flushable;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
@@ -26,7 +27,7 @@
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 
-public class SendAckRequestProcessor implements RequestProcessor {
+public class SendAckRequestProcessor implements RequestProcessor, Flushable {
     private static final Logger LOG = 
Logger.getLogger(SendAckRequestProcessor.class);
     
     Follower follower;
@@ -40,12 +41,16 @@
             QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), 
null,
                 null);
             try {
-                follower.writePacket(qp);
+                follower.writePacket(qp, false);
             } catch (IOException e) {
                 LOG.warn("Ignoring unexpected exception during packet send", 
e);
             }
         }
     }
+    
+    public void flush() throws IOException {
+        follower.writePacket(null, true);
+    }
 
     public void shutdown() {
         // Nothing needed

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
 Fri Feb 20 00:23:40 2009
@@ -78,17 +78,25 @@
 
     public static void deserializeSnapshot(DataTree dt,InputArchive ia,
             Map<Long, Integer> sessions) throws IOException {
-        int count = ia.readInt("count");
-        while (count > 0) {
-            long id = ia.readLong("id");
-            int to = ia.readInt("timeout");
-            sessions.put(id, to);
-            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                                     "loadData --- session in archive: " + id
-                                     + " with timeout: " + to);
-            count--;
+        try {
+            int count = ia.readInt("count");
+            while (count > 0) {
+                long id = ia.readLong("id");
+                int to = ia.readInt("timeout");
+                sessions.put(id, to);
+                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+                                         "loadData --- session in archive: " + 
id
+                                         + " with timeout: " + to);
+                count--;
+            }
+            dt.deserialize(ia, "tree");
+        } catch(IOException e) {
+            throw e;
+        } catch(Exception e) {
+            IOException ioe = new IOException(e.getMessage());
+            ioe.initCause(e);
+            throw ioe;
         }
-        dt.deserialize(ia, "tree");
     }
 
     public static void serializeSnapshot(DataTree dt,OutputArchive oa,

Modified: 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java
 Fri Feb 20 00:23:40 2009
@@ -63,10 +63,10 @@
 
     static Map<Long, Long> totalByTime = new HashMap<Long, Long>();
 
-    static long currentInterval;
+    volatile static long currentInterval;
 
     static long lastChange;
-
+    
     static PrintStream sf;
     static PrintStream tf;
     static {
@@ -208,7 +208,7 @@
                     count = count * 1000 / INTERVAL; // Multiply by 1000 to get
                                                      // reqs/sec
                     if (lastChange != 0
-                            && (lastChange + INTERVAL * 4 + 5000) < now) {
+                            && (lastChange + INTERVAL * 3) < now) {
                         // We only want to print anything if things have had a
                         // chance to change
 
@@ -264,6 +264,8 @@
 
     static public class GeneratorInstance implements Instance {
 
+        byte bytes[];
+        
         int percentage = -1;
 
         int errors;
@@ -319,7 +321,6 @@
 
             public void run() {
                 try {
-                    byte bytes[] = new byte[1024];
                     zk = new ZooKeeper(host, 60000, this);
                     synchronized (this) {
                         if (!connected) {
@@ -461,6 +462,15 @@
                     try {
                         String parts[] = params.split(" ");
                         String hostPort[] = parts[1].split(":");
+                        int bytesSize = 1024;
+                        if (parts.length == 3) {
+                            try {
+                                bytesSize = Integer.parseInt(parts[2]);
+                            } catch(Exception e) {
+                                System.err.println("Not an integer: " + 
parts[2]);
+                            }
+                        }
+                        bytes = new byte[bytesSize];
                         s = new Socket(hostPort[0], 
Integer.parseInt(hostPort[1]));
                         zkThread = new ZooKeeperThread(parts[0]);
                         sendThread = new SenderThread(s);
@@ -545,12 +555,16 @@
     }
 
     private static boolean leaderOnly;
+    private static boolean leaderServes;
     
     private static String []processOptions(String args[]) {
         ArrayList<String> newArgs = new ArrayList<String>();
         for(String a: args) {
             if (a.equals("--leaderOnly")) {
                 leaderOnly = true;
+                leaderServes = true;
+            } else if (a.equals("--leaderServes")) {
+                leaderServes = true;
             } else {
                 newArgs.add(a);
             }
@@ -571,7 +585,7 @@
             NoAssignmentException {
 
         args = processOptions(args);
-        if (args.length == 4) {
+        if (args.length == 5) {
             try {
                 StatusWatcher statusWatcher = new StatusWatcher();
                 ZooKeeper zk = new ZooKeeper(args[0], 15000, statusWatcher);
@@ -587,7 +601,7 @@
                 StringBuilder quorumHostPort = new StringBuilder();
                 StringBuilder zkHostPort = new StringBuilder();
                 for (int i = 0; i < serverCount; i++) {
-                    String r[] = QuorumPeerInstance.createServer(im, i);
+                    String r[] = QuorumPeerInstance.createServer(im, i, 
leaderServes);
                     if (i > 0) {
                         quorumHostPort.append(',');
                         zkHostPort.append(',');
@@ -694,7 +708,7 @@
 
     private static void doUsage() {
         System.err.println("USAGE: " + GenerateLoad.class.getName()
-                + " [--leaderOnly] zookeeper_host:port containerPrefix 
#ofServers #ofClients");
+                + " [--leaderOnly] [--leaderServes] zookeeper_host:port 
containerPrefix #ofServers #ofClients requestSize");
         System.exit(2);
     }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
 Fri Feb 20 00:23:40 2009
@@ -60,18 +60,21 @@
             File zkDirs = new File(tmpDir, "zktmp.cfg");
             logDir = tmpDir;
             snapDir = tmpDir;
+            Properties p;
             if (zkDirs.exists()) {
-                Properties p = new Properties();
+                p = new Properties();
                 p.load(new FileInputStream(zkDirs));
-                logDir = new File(p.getProperty("logDir", 
tmpDir.getAbsolutePath()));
-                snapDir = new File(p.getProperty("snapDir", 
tmpDir.getAbsolutePath()));
+            } else {
+                p = System.getProperties();
             }
+            logDir = new File(p.getProperty("logDir", 
tmpDir.getAbsolutePath()));
+            snapDir = new File(p.getProperty("snapDir", 
tmpDir.getAbsolutePath()));
             logDir = File.createTempFile("zktst", ".dir", logDir);
             logDir.delete();
-            logDir.mkdir();
+            logDir.mkdirs();
             snapDir = File.createTempFile("zktst", ".dir", snapDir);
             snapDir.delete();
-            snapDir.mkdir();
+            snapDir.mkdirs();
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -79,12 +82,18 @@
 
     public void configure(String params) {
         if (clientAddr == null) {
+            String parts[] = params.split(" ");
             // The first time we are configured, it is just to tell
             // us which machine we are
-            serverId = Integer.parseInt(params);
+            serverId = Integer.parseInt(parts[0]);
             if (LOG.isDebugEnabled()) {
                 LOG.info("Setting up server " + serverId);
             }
+            if (parts.length > 1 && parts[1].equals("false")) {
+                System.setProperty("zookeeper.leaderServes", "no");
+            } else {
+                System.setProperty("zookeeper.leaderServes", "yes");
+            }
             // Let's grab two ports
             try {
                 ServerSocket ss = new ServerSocket(0, 1, 
InetAddress.getLocalHost());
@@ -155,6 +164,7 @@
                     LOG.warn("Peer " + serverId + " already started");
                     return;
                 }
+                System.err.println("SnapDir = " + snapDir + " LogDir = " + 
logDir);
                 peer = new QuorumPeer(peers, snapDir, logDir, 
clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
                 peer.start();
                 for(int i = 0; i < 5; i++) {
@@ -212,7 +222,22 @@
      * @throws KeeperException
      */
     public static String[] createServer(InstanceManager im, int i) throws 
NoAvailableContainers, DuplicateNameException, InterruptedException, 
KeeperException {
-        im.assignInstance("server"+i, QuorumPeerInstance.class, 
Integer.toString(i), 50);
+        return createServer(im, i, true);
+    }
+    
+    /**
+     * This method is used to configure a QuorumPeerInstance
+     * 
+     * @param im the InstanceManager that will be managing the new instance
+     * @param i the server number to configure (should be zero based)
+     * @param leaderServes if false, the leader will not accept client 
connections
+     * @throws NoAvailableContainers
+     * @throws DuplicateNameException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public static String[] createServer(InstanceManager im, int i, boolean 
leaderServes) throws NoAvailableContainers, DuplicateNameException, 
InterruptedException, KeeperException {
+        im.assignInstance("server"+i, QuorumPeerInstance.class, 
Integer.toString(i) + " " + leaderServes, 50);
         return im.getStatus("server"+i, 3000).split(",");
         
     }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java
 Fri Feb 20 00:23:40 2009
@@ -43,7 +43,7 @@
     ArrayList<LEThread> threads;
     File tmpdir[];
     int port[];
-    int[] round;
+    volatile int [] round;
     
     @Override
     public void setUp() throws Exception {

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=746067&r1=746066&r2=746067&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
 Fri Feb 20 00:23:40 2009
@@ -44,7 +44,7 @@
 
     private static String HOSTPORT = "127.0.0.1:2344";
 
-    private CountDownLatch startSignal;
+    private volatile CountDownLatch startSignal;
 
     @Override
     protected void setUp() throws Exception {
@@ -86,7 +86,11 @@
                        ClientBase.waitForServerUp(HOSTPORT,
                                        CONNECTION_TIMEOUT));
 
+            startSignal = new CountDownLatch(1);
             ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+            startSignal.await(CONNECTION_TIMEOUT,
+                    TimeUnit.MILLISECONDS);
+            assertTrue("count == 0", startSignal.getCount() == 0);
             String path;
             LOG.info("starting creating nodes");
             for (int i = 0; i < 10; i++) {


Reply via email to