Author: fpj
Date: Fri Oct  3 06:06:39 2008
New Revision: 701369

URL: http://svn.apache.org/viewvc?rev=701369&view=rev
Log:
ZOOKEEPER-136.patch

Added:
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
Modified:
    hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java 
Fri Oct  3 06:06:39 2008
@@ -103,6 +103,9 @@
     }
     
     public void writeRecord(Record r, String tag) throws IOException {
+        if (r == null) {
+            return;
+        }
         r.serialize(this, tag);
     }
     

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
Fri Oct  3 06:06:39 2008
@@ -73,8 +73,7 @@
 public class ClientCnxn {
     private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
 
-    private ArrayList<InetSocketAddress> serverAddrs =
-        new ArrayList<InetSocketAddress>();
+    private ArrayList<InetSocketAddress> serverAddrs = new 
ArrayList<InetSocketAddress>();
 
     static class AuthData {
         AuthData(String scheme, byte data[]) {
@@ -121,10 +120,12 @@
 
     final Selector selector = Selector.open();
     
-    /** Set to true when close is called. Latches the connection such that
-     * we don't attempt to re-connect to the server if in the middle of
-     * closing the connection (client sends session disconnect to server
-     * as part of close operation) */
+    /**
+     * Set to true when close is called. Latches the connection such that we
+     * don't attempt to re-connect to the server if in the middle of closing 
the
+     * connection (client sends session disconnect to server as part of close
+     * operation)
+     */
     volatile boolean closing = false;
 
     public long getSessionId() {
@@ -138,7 +139,8 @@
     @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
-        sb.append("sessionId: 
0x").append(Long.toHexString(getSessionId())).append("\n");
+        sb.append("sessionId: 0x").append(Long.toHexString(getSessionId()))
+          .append("\n");
         sb.append("lastZxid: ").append(lastZxid).append("\n");
         sb.append("xid: ").append(xid).append("\n");
         sb.append("nextAddrToTry: ").append(nextAddrToTry).append("\n");
@@ -200,8 +202,25 @@
             }
             this.watchRegistration = watchRegistration;
         }
+
+        @Override
+        public String toString() {
+            StringBuffer sb = new StringBuffer();
+
+            sb.append("path:" + path);
+            sb.append(" finished:" + finished);
+
+            sb.append(" header:: " + header);
+            sb.append(" replyHeader:: " + replyHeader);
+            sb.append(" request:: " + request);
+            sb.append(" response:: " + response);
+
+            // jute toString is horrible, remove unnecessary newlines
+            return sb.toString().replaceAll("\r*\n+", " ");
+        }
     }
 
+
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
             ClientWatchManager watcher)
         throws IOException
@@ -224,8 +243,7 @@
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
             ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
-        throws IOException
-    {
+        throws IOException {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -477,20 +495,28 @@
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-            ReplyHeader r = new ReplyHeader();
+            ReplyHeader replyHdr = new ReplyHeader();
 
-            r.deserialize(bbia, "header");
-            if (r.getXid() == -2) {
+            replyHdr.deserialize(bbia, "header");
+            if (replyHdr.getXid() == -2) {
                 // -2 is the xid for pings
+                LOG
+                    .debug("Got ping sessionid:0x"
+                    + Long.toHexString(sessionId));
                 return;
             }
-            if (r.getXid() == -4) {
+            if (replyHdr.getXid() == -4) {
                 // -2 is the xid for AuthPacket
                 // TODO: process AuthPacket here
+                LOG
+                    .debug("Got auth sessionid:0x"
+                        + Long.toHexString(sessionId));
                 return;
             }
-            if (r.getXid() == -1) {
+            if (replyHdr.getXid() == -1) {
                 // -1 means notification
+                LOG.debug("Got notification sessionid:0x"
+                    + Long.toHexString(sessionId));
                 WatcherEvent event = new WatcherEvent();
                 event.deserialize(bbia, "response");
                 WatchedEvent we = new WatchedEvent(event);
@@ -504,29 +530,37 @@
             }
             if (pendingQueue.size() == 0) {
                 throw new IOException("Nothing in the queue, but got "
-                        + r.getXid());
+                        + replyHdr.getXid());
             }
-            Packet p = null;
+            Packet packet = null;
             synchronized (pendingQueue) {
-                p = pendingQueue.remove();
+                packet = pendingQueue.remove();
             }
             /*
              * Since requests are processed in order, we better get a response
              * to the first request!
              */
-            if (p.header.getXid() != r.getXid()) {
-                throw new IOException("Xid out of order. Got " + r.getXid()
-                        + " expected " + p.header.getXid());
-            }
-            p.replyHeader.setXid(r.getXid());
-            p.replyHeader.setErr(r.getErr());
-            p.replyHeader.setZxid(r.getZxid());
-            lastZxid = r.getZxid();
-            if (p.response != null && r.getErr() == 0) {
-                p.response.deserialize(bbia, "response");
+         if (packet.header.getXid() != replyHdr.getXid()) {
+         throw new IOException("Xid out of order. Got "
+                  + replyHdr.getXid() + " expected "
+                  + packet.header.getXid());
             }
-            p.finished = true;
-            finishPacket(p);
+
+         packet.replyHeader.setXid(replyHdr.getXid());
+         packet.replyHeader.setErr(replyHdr.getErr());
+         packet.replyHeader.setZxid(replyHdr.getZxid());
+         lastZxid = replyHdr.getZxid();
+         if (packet.response != null && replyHdr.getErr() == 0) {
+             packet.response.deserialize(bbia, "response");
+            }
+            packet.finished = true;
+
+            if (LOG.isDebugEnabled()) {
+            LOG.debug("Reading reply sessionid:0x"
+                + Long.toHexString(sessionId) + ", packet:: " + packet);
+        }
+
+            finishPacket(packet);
         }
 
         /**
@@ -789,14 +823,15 @@
                 } catch (Exception e) {
                     if (closing) {
                         // closing so this is expected
-                        LOG.info("Exception while closing send thread for 
session 0x" 
+                        LOG
+                           .info("Exception while closing send thread for 
session 0x"
                                 + Long.toHexString(getSessionId())
                                 + " : " + e.getMessage());
                         break;
                     } else {
                         LOG.warn("Exception closing session 0x" 
-                                + Long.toHexString(getSessionId()),
-                                e);
+                                + Long.toHexString(getSessionId()) + " to "
+                                + sockKey, e);
                         cleanup();
                         if (zooKeeper.state.isAlive()) {
                             eventThread.queueEvent(new WatchedEvent(
@@ -889,8 +924,8 @@
     }
 
     /**
-     * Close the connection, which includes; send session disconnect to
-     * the server, shutdown the send/event threads.
+     * Close the connection, which includes; send session disconnect to the
+     * server, shutdown the send/event threads.
      * 
      * @throws IOException
      */
@@ -919,13 +954,10 @@
     }
 
     public ReplyHeader submitRequest(RequestHeader h, Record request,
-            Record response,
-            WatchRegistration watchRegistration)
-        throws InterruptedException
-    {
+            Record response, WatchRegistration watchRegistration)
+            throws InterruptedException {
         ReplyHeader r = new ReplyHeader();
-        Packet packet =
-            queuePacket(h, r, request, response, null, null, null,
+        Packet packet = queuePacket(h, r, request, response, null, null, null,
                     watchRegistration);
         synchronized (packet) {
             while (!packet.finished) {
@@ -937,8 +969,7 @@
 
     Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
             Record response, AsyncCallback cb, String path, Object ctx,
-            WatchRegistration watchRegistration)
-    {
+            WatchRegistration watchRegistration) {
         Packet packet = null;
         synchronized (outgoingQueue) {
             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -69,9 +69,9 @@
     }
 
     public void processRequest(Request request) {
-        // LOG.info("Zoo>>> cxid = " + request.cxid + " type = " +
-        // request.type + " id = " + request.sessionId + " cnxn " +
-        // request.cnxn);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Processing request:: " + request);
+        }
         // request.addRQRec(">final");
         long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
         if (request.type == OpCode.ping) {
@@ -130,6 +130,10 @@
             if (request.hdr != null && request.hdr.getType() == OpCode.error) {
                 throw KeeperException.create(((ErrorTxn) 
request.txn).getErr());
             }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(request);
+            }
             switch (request.type) {
             case OpCode.ping:
                 request.cnxn.sendResponse(new ReplyHeader(-2,
@@ -157,7 +161,6 @@
                 err = rc.err;
                 break;
             case OpCode.sync:
-                LOG.debug("OpCode.sync " + request);
                 SyncRequest syncRequest = new SyncRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         syncRequest);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Fri Oct  3 06:06:39 2008
@@ -84,7 +84,7 @@
         int outstandingLimit = 1;
 
         public Factory(int port) throws IOException {
-            super("NIOServerCxn.Factory");
+            super("NIOServerCxn.Factory:" + port);
             setDaemon(true);
             this.ss = ServerSocketChannel.open();
             ss.socket().bind(new InetSocketAddress(port));
@@ -421,7 +421,7 @@
                     || ap.handleAuthentication(this, authPacket.getAuth()) != 
KeeperException.Code.Ok) {
                 if (ap == null)
                     LOG.error("No authentication provider for scheme: "
-                            + scheme);
+                            + scheme + " has " + 
ProviderRegistry.listProviders());
                 else
                     LOG.debug("Authentication failed for scheme: "
                             + scheme);
@@ -449,6 +449,7 @@
                 outstandingRequests++;
                 // check throttling
                 if (zk.getInProcess() > factory.outstandingLimit) {
+                    LOG.warn("Throttling recv " + zk.getInProcess());
                     disableRecv();
                     // following lines should not be needed since we are 
already
                     // reading

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -76,7 +76,7 @@
 
     public PrepRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
-        super("ProcessThread");
+        super("ProcessThread:" + zks.getClientPort());
         this.nextProcessor = nextProcessor;
         this.zks = zks;
 

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java 
Fri Oct  3 06:06:39 2008
@@ -163,19 +163,19 @@
     @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
-        sb.append("session 0x").append(Long.toHexString(sessionId));
-        sb.append(" cxid 0x").append(Long.toHexString(cxid));
-        sb.append("zxid 0x").append(Long.toHexString((hdr == null ? -2 : 
hdr.getZxid()))).append(
-                " ");
-        sb
-                .append(
-                        " txn type = "
-                                + (hdr == null ? "unknown" : "" + 
hdr.getType()))
-                .append(" ");
-        sb.append(op2String(type)).append(" ");
+        sb.append("sessionid:0x").append(Long.toHexString(sessionId));
+        sb.append(" type:").append(op2String(type));
+        sb.append(" cxid:0x").append(Long.toHexString(cxid));
+        sb.append(" zxid:0x").append(Long.toHexString((hdr == null ?
+                -2 : hdr.getZxid())));
+        sb.append(" txntype:" + (hdr == null ?
+                "unknown" : "" + hdr.getType()));
+        sb.append(" ");
 
         String path = "n/a";
-        if (type != OpCode.createSession) {
+        if (type != OpCode.createSession && request != null
+                && request.remaining() >= 4)
+        {
             try {
                 request.clear();
                 int pathLen = request.getInt();
@@ -188,6 +188,7 @@
             }
         }
         sb.append(path).append(" ");
+
         return sb.toString();
     }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -54,7 +54,7 @@
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
-        super("SyncThread");
+        super("SyncThread:" + zks.getClientPort());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
         start();

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
 Fri Oct  3 06:06:39 2008
@@ -467,6 +467,11 @@
      */
     public void submitRequest(ServerCnxn cnxn, long sessionId, int type,
             int xid, ByteBuffer bb, List<Id> authInfo) {
+        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
+        submitRequest(si);
+    }
+    
+    public void submitRequest(Request si) {
         if (firstProcessor == null) {
             synchronized (this) {
                 try {
@@ -482,16 +487,15 @@
             }
         }
         try {
-            touch(cnxn);
-            Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
-            boolean validpacket = Request.isValid(type);
+            touch(si.cnxn);
+            boolean validpacket = Request.isValid(si.type);
             if (validpacket) {
                 firstProcessor.processRequest(si);
-                if (cnxn != null) {
+                if (si.cnxn != null) {
                     incInProcess();
                 }
             } else {
-                LOG.warn("Dropping packet at server of type " + type);
+                LOG.warn("Dropping packet at server of type " + si.type);
                 // if unvalid packet drop the packet.
             }
         } catch (IOException e) {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
 Fri Oct  3 06:06:39 2008
@@ -36,7 +36,6 @@
         synchronized (ProviderRegistry.class) {
             if (initialized)
                 return;
-            initialized = true;
             IPAuthenticationProvider ipp = new IPAuthenticationProvider();
             HostAuthenticationProvider hostp = new 
HostAuthenticationProvider();
             DigestAuthenticationProvider digp = new 
DigestAuthenticationProvider();
@@ -59,6 +58,7 @@
                     }
                 }
             }
+            initialized = true;
         }
     }
 
@@ -67,4 +67,12 @@
             initialize();
         return authenticationProviders.get(scheme);
     }
+
+    public static String listProviders() {
+        StringBuilder sb = new StringBuilder();
+        for(String s: authenticationProviders.keySet()) {
+        sb.append(s + " ");
+}
+        return sb.toString();
+    }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -49,8 +49,17 @@
 
     RequestProcessor nextProcessor;
 
-    public CommitProcessor(RequestProcessor nextProcessor) {
+    /**
+     * This flag indicates whether we need to wait for a response to come back 
from the
+     * leader or we just let the sync operation flow through like a read. The 
flag will
+     * be true if the CommitProcessor is in a Leader pipeline.
+     */
+    boolean matchSyncs;
+
+    public CommitProcessor(RequestProcessor nextProcessor, String id, boolean 
matchSyncs) {
+        super("CommitProcessor:" + id);
         this.nextProcessor = nextProcessor;
+        this.matchSyncs = matchSyncs;
         start();
     }
 
@@ -122,8 +131,11 @@
                             nextPending = request;
                             break;
                         case OpCode.sync:
-                            nextPending = request;
-                            //pendingSyncs.add(request);
+                            if (matchSyncs) {
+                                nextPending = request;
+                            } else {
+                                toProcess.add(request);
+                            }
                             break;
                         default:
                             toProcess.add(request);
@@ -145,7 +157,9 @@
                          new Exception("committing a null! "));
                 return;
             }
-            LOG.debug("Committing" + request.cxid);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing request:: " + request);
+            }
             committedRequests.add(request);
             notifyAll();
         }
@@ -153,9 +167,10 @@
 
     synchronized public void processRequest(Request request) {
         // request.addRQRec(">commit");
-        // LOG.info("Zoo processReq>>> cxid = " + request.cxid + " type =
-        // " + request.type + " id = " + request.sessionId + " cnxn " +
-        // request.cnxn);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Processing request:: " + request);
+        }
+        
         if (!finished) {
             queuedRequests.add(request);
             notifyAll();

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
 Fri Oct  3 06:06:39 2008
@@ -330,10 +330,12 @@
                     type = bb.getInt();
                     bb = bb.slice();
                     if(type == OpCode.sync){
-                        leader.setSyncHandler(this, sessionId);
-                    }
+                       leader.zk.submitRequest(new FollowerSyncRequest(this, 
sessionId, cxid, type, bb,
+                                qp.getAuthinfo()));
+                    } else {
                     leader.zk.submitRequest(null, sessionId, type, cxid, bb,
                             qp.getAuthinfo());
+                    }
                     break;
                 default:
                 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -45,6 +45,7 @@
 
     public FollowerRequestProcessor(FollowerZooKeeperServer zks,
             RequestProcessor nextProcessor) {
+        super("FollowerRequestProcessor:" + zks.getClientPort());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
         start();

Added: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java?rev=701369&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
 Fri Oct  3 06:06:39 2008
@@ -0,0 +1,16 @@
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.Request;
+
+public class FollowerSyncRequest extends Request {
+       FollowerHandler fh;
+       public FollowerSyncRequest(FollowerHandler fh, long sessionId, int xid, 
int type,
+                       ByteBuffer bb, List<Id> authInfo) {
+               super(null, sessionId, xid, type, bb, authInfo);
+               this.fh = fh;
+       }
+}

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
 Fri Oct  3 06:06:39 2008
@@ -80,7 +80,8 @@
     @Override
     protected void setupRequestProcessors() {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-        commitProcessor = new CommitProcessor(finalProcessor);
+        commitProcessor = new CommitProcessor(finalProcessor,
+                Integer.toString(getClientPort()), true);
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
         syncProcessor = new SyncRequestProcessor(this,
                 new SendAckRequestProcessor(getFollower()));
@@ -135,16 +136,16 @@
         commitProcessor.commit(request);
     }
     
-    public void sync(){
+    synchronized public void sync(){
         if(pendingSyncs.size() ==0){
             LOG.warn("Not expecting a sync.");
             return;
         }
                 
-        commitProcessor.commit(pendingSyncs.remove());
+        Request r = pendingSyncs.remove();
+               commitProcessor.commit(r);
     }
              
-         
     @Override
     public int getGlobalOutstandingLimit() {
         return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
 Fri Oct  3 06:06:39 2008
@@ -25,9 +25,11 @@
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.SocketException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.log4j.Logger;
@@ -70,11 +72,8 @@
     public HashSet<FollowerHandler> forwardingFollowers = new 
HashSet<FollowerHandler>();
     
     //Pending sync requests
-    public HashMap<Long,Request> pendingSyncs = new HashMap<Long,Request>();
+    public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new 
HashMap<Long,List<FollowerSyncRequest>>();
                
-    //Map sync request to FollowerHandler
-    public HashMap<Long,FollowerHandler> syncHandler = new 
HashMap<Long,FollowerHandler>();
-       
     /**
      * Adds follower to the leader.
      * 
@@ -253,7 +252,7 @@
         newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                 null, null);
         if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
-            LOG.error("NEWLEADER proposal has Zxid of "
+            LOG.warn("NEWLEADER proposal has Zxid of "
                     + newLeaderProposal.packet.getZxid());
         }
         outstandingProposals.add(newLeaderProposal);
@@ -373,17 +372,29 @@
      */
     synchronized public void processAck(long zxid, SocketAddress followerAddr) 
{
         boolean first = true;
-        /*
-         * LOG.error("Ack zxid: " + Long.toHexString(zxid)); for (Proposal
-         * p : outstandingProposals) { long packetZxid = p.packet.getZxid();
-         * LOG.error("outstanding proposal: " +
-         * Long.toHexString(packetZxid)); } LOG.error("outstanding
-         * proposals all");
-         */
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ack zxid: 0x" + Long.toHexString(zxid));
+            for (Proposal p : outstandingProposals) {
+                long packetZxid = p.packet.getZxid();
+                LOG.debug("outstanding proposal: 0x"
+                        + Long.toHexString(packetZxid));
+            }
+            LOG.debug("outstanding proposals all");
+        }
+        
         if (outstandingProposals.size() == 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("outstanding is 0");
+            }
             return;
         }
         if (outstandingProposals.peek().packet.getZxid() > zxid) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("proposal has already been committed, pzxid:"
+                        + outstandingProposals.peek().packet.getZxid()
+                        + " zxid:" + zxid);
+            }
             // The proposal has already been committed
             return;
         }
@@ -391,13 +402,16 @@
             long packetZxid = p.packet.getZxid();
             if (packetZxid == zxid) {
                 p.ackCount++;
-                // LOG.error("FIXMSG",new RuntimeException(), "Count for " +
-                // Long.toHexString(zxid) + " is " + p.ackCount);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
+                            + " is " + p.ackCount);
+                }
+                
                 if (p.ackCount > self.quorumPeers.size() / 2){
                     if (!first) {
-                        LOG.error("Commiting zxid 0x" + Long.toHexString(zxid)
+                        LOG.fatal("Commiting zxid 0x" + Long.toHexString(zxid)
                                 + " from " + followerAddr + " not first!");
-                        LOG.error("First is "
+                        LOG.fatal("First is "
                                 + outstandingProposals.element().packet);
                         System.exit(13);
                     }
@@ -408,23 +422,23 @@
                     // We don't commit the new leader proposal
                     if ((zxid & 0xffffffffL) != 0) {
                         if (p.request == null) {
-                            LOG.error("Going to commmit null: " + p);
+                            LOG.warn("Going to commmit null: " + p);
                         }
                         commit(zxid);
                         zk.commitProcessor.commit(p.request);
                         if(pendingSyncs.containsKey(zxid)){
-                            
sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), 
pendingSyncs.get(zxid));
-                            syncHandler.remove(pendingSyncs.get(zxid));
-                            pendingSyncs.remove(zxid);
+                           for(FollowerSyncRequest r: 
pendingSyncs.remove(zxid)) {
+                               sendSync(r);
                         }
                     }
                 }
+                }
                 return;
             } else {
                 first = false;
             }
         }
-        LOG.error("Trying to commit future proposal: zxid 0x"
+        LOG.warn("Trying to commit future proposal: zxid 0x"
                 + Long.toHexString(zxid) + " from " + followerAddr);
     }
 
@@ -528,8 +542,7 @@
             }
             baos.close();
         } catch (IOException e) {
-            // This really should be impossible
-            LOG.error("FIXMSG",e);
+            LOG.warn("This really should be impossible", e);
         }
         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos
                 .toByteArray(), null);
@@ -538,6 +551,10 @@
         p.packet = pp;
         p.request = request;
         synchronized (this) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Proposing:: " + request);
+            }
+
             outstandingProposals.add(p);
             lastProposed = p.packet.getZxid();
             sendPacket(pp);
@@ -551,44 +568,29 @@
      * @param r the request
      */
     
-    public void processSync(Request r){
+    synchronized public void processSync(FollowerSyncRequest r){
         if(outstandingProposals.isEmpty()){
-            LOG.warn("No outstanding proposal");
-            sendSync(syncHandler.get(r.sessionId), r);
-                syncHandler.remove(r.sessionId);
-        }
-        else{
-            pendingSyncs.put(lastProposed, r);
+            sendSync(r);
+        } else {
+            List<FollowerSyncRequest> l = pendingSyncs.get(lastProposed);
+            if (l == null) {
+                l = new ArrayList<FollowerSyncRequest>();
+            }
+            l.add(r);
+            pendingSyncs.put(lastProposed, l);
         }
     }
         
     /**
-     * Set FollowerHandler for sync.
-     * 
-     * @param f
-     * @param s
-     */
-        
-    synchronized public void setSyncHandler(FollowerHandler f, long s){
-        syncHandler.put(s, f);
-    }
-            
-    /**
      * Sends a sync message to the appropriate server
      * 
      * @param f
      * @param r
      */
             
-    public void sendSync(FollowerHandler f, Request r){
-        if(f != null){
-            QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
-            f.queuePacket(qp);
-        }
-        else{
-            LOG.warn("Committing sync: " + r.cxid );
-            zk.commitProcessor.commit(r);
-        }
+    public void sendSync(FollowerSyncRequest r){
+        QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
+        r.fh.queuePacket(qp);
     }
                 
     /**

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
 Fri Oct  3 06:06:39 2008
@@ -59,7 +59,8 @@
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         RequestProcessor toBeAppliedProcessor = new 
Leader.ToBeAppliedRequestProcessor(
                 finalProcessor, getLeader().toBeApplied);
-        commitProcessor = new CommitProcessor(toBeAppliedProcessor);
+        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
+                Integer.toString(getClientPort()), false);
         RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         firstProcessor = new PrepRequestProcessor(this, proposalProcessor);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
 Fri Oct  3 06:06:39 2008
@@ -56,17 +56,10 @@
          * call processRequest on the next processor.
          */
         
-        if(request.type == ZooDefs.OpCode.sync){
-            zks.getLeader().processSync(request);
-
-            if(!zks.getLeader().syncHandler.containsKey(request.sessionId)){
-                zks.getLeader().syncHandler.put(request.sessionId, null);
+        if(request instanceof FollowerSyncRequest){
+            zks.getLeader().processSync((FollowerSyncRequest)request);
+        } else {
                 nextProcessor.processRequest(request);
-            }
-            
-        }
-        else{
-            nextProcessor.processRequest(request);
             if (request.hdr != null) {
                 // We need to sync and get consensus on any transactions
                 zks.getLeader().propose(request);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
 Fri Oct  3 06:06:39 2008
@@ -384,6 +384,8 @@
 
     @Override
     public void run() {
+        setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
+
         /*
          * Main loop
          */

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java 
Fri Oct  3 06:06:39 2008
@@ -123,7 +123,7 @@
                 fail("unexpected interrupt");
             }
             // on the lookout for timeout
-            assertSame(latch.getCount(), 0L);
+            assertSame(0L, latch.getCount());
             
             String actual = toString();
             
@@ -410,11 +410,15 @@
             verify();
         }
         
+        public void setData() {
+            zk.setData(path, data, version, this, toString());
+        }
+        
         public void verifySetData() {
             stat.setVersion(1);
             new StringCB(zk).verifyCreate();
 
-            zk.setData(path, data, version, this, toString());
+            setData();
             verify();
         }
         
@@ -460,10 +464,14 @@
             super(zk, latch);
         }
         
+        public void delete() {
+            zk.delete(path, version, this, toString());
+        }
+        
         public void verifyDelete() {
             new StringCB(zk).verifyCreate();
 
-            zk.delete(path, version, this, toString());
+            delete();
             verify();
         }
         
@@ -473,8 +481,12 @@
             verify();
         }
         
-        public void verifySync() {
+        public void sync() {
             zk.sync(path, this, toString());
+        }
+        
+        public void verifySync() {
+            sync();
             verify();
         }
         

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java
 Fri Oct  3 06:06:39 2008
@@ -133,12 +133,12 @@
 
     @Test
     public void testAsyncExists() {
-        new StatCB(zk).verifySetData();
+        new StatCB(zk).verifyExists();
     }
 
     @Test
     public void testAsyncExistsFailure_NoNode() {
-        new StatCB(zk).verifySetData();
+        new StatCB(zk).verifyExistsFailure_NoNode();
     }
 
     @Test


Reply via email to