Author: breed
Date: Thu Nov 12 19:53:07 2009
New Revision: 835515

URL: http://svn.apache.org/viewvc?rev=835515&view=rev
Log:
ZOOKEEPER-566. "reqs" four letter word (command port) returns no information

Added:
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java
Modified:
    
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
    
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java

Modified: 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 (original)
+++ 
hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml
 Thu Nov 12 19:53:07 2009
@@ -930,29 +930,45 @@
       composed of four letters. You issue the commands to ZooKeeper via telnet
       or nc, at the client port.</para>
 
+      <para>Three of the more interesting commands: "stat" gives some
+      general information about the server and connected clients,
+      while "srvr" and "cons" give extended details on server and
+      connections respectively.</para>
+
       <variablelist>
         <varlistentry>
-          <term>dump</term>
+          <term>cons</term>
 
           <listitem>
-            <para>Lists the outstanding sessions and ephemeral nodes. This
-            only works on the leader.</para>
+            <para>List full connection/session details for all clients
+            connected to this server. Includes information on numbers
+            of packets received/sent, session id, operation latencies,
+            last operation performed, etc...</para>
           </listitem>
         </varlistentry>
 
         <varlistentry>
-          <term>envi</term>
+          <term>crst</term>
 
           <listitem>
-            <para>Print details about serving environment</para>
+            <para>Reset connection/session statistics for all 
connections.</para>
           </listitem>
         </varlistentry>
 
         <varlistentry>
-          <term>reqs</term>
+          <term>dump</term>
 
           <listitem>
-            <para>List outstanding requests</para>
+            <para>Lists the outstanding sessions and ephemeral nodes. This
+            only works on the leader.</para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry>
+          <term>envi</term>
+
+          <listitem>
+            <para>Print details about serving environment</para>
           </listitem>
         </varlistentry>
 
@@ -970,7 +986,15 @@
           <term>srst</term>
 
           <listitem>
-            <para>Reset statistics returned by stat command.</para>
+            <para>Reset server statistics.</para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry>
+          <term>srvr</term>
+
+          <listitem>
+            <para>Lists full details for the server.</para>
           </listitem>
         </varlistentry>
 
@@ -978,7 +1002,7 @@
           <term>stat</term>
 
           <listitem>
-            <para>Lists statistics about performance and connected
+            <para>Lists brief details for the server and connected
             clients.</para>
           </listitem>
         </varlistentry>

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java
 Thu Nov 12 19:53:07 2009
@@ -29,20 +29,23 @@
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
 
 /**
  * Implementation of connection MBean interface.
  */
 public class ConnectionBean implements ConnectionMXBean, ZKMBeanInfo {
     private static final Logger LOG = Logger.getLogger(ConnectionBean.class);
-    private ServerCnxn connection;
-    private ZooKeeperServer zk;
-    private Date timeCreated;
-    
+
+    private final ServerCnxn connection;
+    private final CnxnStats stats;
+
+    private final ZooKeeperServer zk;
+
     public ConnectionBean(ServerCnxn connection,ZooKeeperServer zk){
-        this.connection=connection;
-        this.zk=zk;
-        timeCreated=new Date();
+        this.connection = connection;
+        this.stats = (CnxnStats)connection.getStats();
+        this.zk = zk;
     }
     
     public String getSessionId() {
@@ -80,7 +83,7 @@
     }
     
     public String getStartedTime() {
-        return timeCreated.toString();
+        return stats.getEstablished().toString();
     }
     
     public void terminateSession() {
@@ -96,6 +99,10 @@
         connection.sendCloseSession();
     }
 
+    public void resetCounters() {
+        stats.reset();
+    }
+
     @Override
     public String toString() {
         return "ConnectionBean{ClientIP=" + ObjectName.quote(getSourceIP())
@@ -103,19 +110,50 @@
     }
     
     public long getOutstandingRequests() {
-        return connection.getStats().getOutstandingRequests();
+        return stats.getOutstandingRequests();
     }
     
     public long getPacketsReceived() {
-        return connection.getStats().getPacketsReceived();
+        return stats.getPacketsReceived();
     }
     
     public long getPacketsSent() {
-        return connection.getStats().getPacketsSent();
+        return stats.getPacketsSent();
     }
     
     public int getSessionTimeout() {
         return connection.getSessionTimeout();
     }
 
+    public long getMinLatency() {
+        return stats.getMinLatency();
+    }
+
+    public long getAvgLatency() {
+        return stats.getAvgLatency();
+    }
+
+    public long getMaxLatency() {
+        return stats.getMaxLatency();
+    }
+    
+    public String getLastOperation() {
+        return stats.getLastOperation();
+    }
+
+    public String getLastCxid() {
+        return "0x" + Long.toHexString(stats.getLastCxid());
+    }
+
+    public String getLastZxid() {
+        return "0x" + Long.toHexString(stats.getLastZxid());
+    }
+
+    public String getLastResponseTime() {
+        return new Date(stats.getLastResponseTime()).toString();
+    }
+
+    public long getLastLatency() {
+        return stats.getLastLatency();
+    }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java
 Thu Nov 12 19:53:07 2009
@@ -65,4 +65,34 @@
      * reconnect with the same session id.
      */
     public void terminateConnection();
+
+
+    /** Min latency in ms
+     * @since 3.3.0 */
+    long getMinLatency();
+    /** Average latency in ms
+     * @since 3.3.0 */
+    long getAvgLatency();
+    /** Max latency in ms
+     * @since 3.3.0 */
+    long getMaxLatency();
+    /** Last operation performed by this connection
+     * @since 3.3.0 */
+    String getLastOperation();
+    /** Last cxid of this connection
+     * @since 3.3.0 */
+    String getLastCxid();
+    /** Last zxid of this connection
+     * @since 3.3.0 */
+    String getLastZxid();
+    /** Last time server sent a response to client on this connection
+     * @since 3.3.0 */
+    String getLastResponseTime();
+    /** Latency of last response to client on this connection in ms
+     * @since 3.3.0 */
+    long getLastLatency();
+
+    /** Reset counters
+     * @since 3.3.0 */
+    void resetCounters();
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
 Thu Nov 12 19:53:07 2009
@@ -35,10 +35,10 @@
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.GetACLRequest;
 import org.apache.zookeeper.proto.GetACLResponse;
-import org.apache.zookeeper.proto.GetChildrenRequest;
-import org.apache.zookeeper.proto.GetChildrenResponse;
 import org.apache.zookeeper.proto.GetChildren2Request;
 import org.apache.zookeeper.proto.GetChildren2Response;
+import org.apache.zookeeper.proto.GetChildrenRequest;
+import org.apache.zookeeper.proto.GetChildrenResponse;
 import org.apache.zookeeper.proto.GetDataRequest;
 import org.apache.zookeeper.proto.GetDataResponse;
 import org.apache.zookeeper.proto.ReplyHeader;
@@ -48,6 +48,7 @@
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
@@ -136,6 +137,9 @@
         if (request.cnxn == null) {
             return;
         }
+        ServerCnxn cnxn = request.cnxn;
+
+        String lastOp = "NA";
         zks.decInProcess();
         Code err = Code.OK;
         Record rsp = null;
@@ -145,7 +149,7 @@
                 throw KeeperException.create(KeeperException.Code.get((
                         (ErrorTxn) request.txn).getErr()));
             }
-            
+
             KeeperException ke = request.getException();
             if (ke != null) {
                 throw ke;
@@ -156,39 +160,59 @@
             }
             switch (request.type) {
             case OpCode.ping: {
-                request.cnxn.sendResponse(new ReplyHeader(-2,
+                zks.serverStats().updateLatency(request.createTime);
+
+                lastOp = "PING";
+                ((CnxnStats)cnxn.getStats())
+                .updateForResponse(request.cxid, request.zxid, lastOp,
+                        request.createTime, System.currentTimeMillis());
+
+                cnxn.sendResponse(new ReplyHeader(-2,
                         zks.dataTree.lastProcessedZxid, 0), null, "response");
                 return;
             }
             case OpCode.createSession: {
-                request.cnxn.finishSessionInit(true);
+                zks.serverStats().updateLatency(request.createTime);
+
+                lastOp = "SESS";
+                ((CnxnStats)cnxn.getStats())
+                .updateForResponse(request.cxid, request.zxid, lastOp,
+                        request.createTime, System.currentTimeMillis());
+
+                cnxn.finishSessionInit(true);
                 return;
             }
             case OpCode.create: {
+                lastOp = "CREA";
                 rsp = new CreateResponse(rc.path);
                 err = Code.get(rc.err);
                 break;
             }
             case OpCode.delete: {
+                lastOp = "DELE";
                 err = Code.get(rc.err);
                 break;
             }
             case OpCode.setData: {
+                lastOp = "SETD";
                 rsp = new SetDataResponse(rc.stat);
                 err = Code.get(rc.err);
                 break;
             }
             case OpCode.setACL: {
+                lastOp = "SETD";
                 rsp = new SetACLResponse(rc.stat);
                 err = Code.get(rc.err);
                 break;
             }
             case OpCode.closeSession: {
+                lastOp = "CLOS";
                 closeSession = true;
                 err = Code.get(rc.err);
                 break;
             }
             case OpCode.sync: {
+                lastOp = "SYNC";
                 SyncRequest syncRequest = new SyncRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         syncRequest);
@@ -196,6 +220,7 @@
                 break;
             }
             case OpCode.exists: {
+                lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for 
this!
                 ExistsRequest existsRequest = new ExistsRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
@@ -205,11 +230,12 @@
                     throw new KeeperException.BadArgumentsException();
                 }
                 Stat stat = zks.dataTree.statNode(path, existsRequest
-                        .getWatch() ? request.cnxn : null);
+                        .getWatch() ? cnxn : null);
                 rsp = new ExistsResponse(stat);
                 break;
             }
             case OpCode.getData: {
+                lastOp = "GETD";
                 GetDataRequest getDataRequest = new GetDataRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getDataRequest);
@@ -226,11 +252,12 @@
                         request.authInfo);
                 Stat stat = new Stat();
                 byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,
-                        getDataRequest.getWatch() ? request.cnxn : null);
+                        getDataRequest.getWatch() ? cnxn : null);
                 rsp = new GetDataResponse(b, stat);
                 break;
             }
             case OpCode.setWatches: {
+                lastOp = "SETW";
                 SetWatches setWatches = new SetWatches();
                 // XXX We really should NOT need this!!!!
                 request.request.rewind();
@@ -239,10 +266,11 @@
                 zks.dataTree.setWatches(relativeZxid, 
                         setWatches.getDataWatches(), 
                         setWatches.getExistWatches(),
-                        setWatches.getChildWatches(), request.cnxn);
+                        setWatches.getChildWatches(), cnxn);
                 break;
             }
             case OpCode.getACL: {
+                lastOp = "GETA";
                 GetACLRequest getACLRequest = new GetACLRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getACLRequest);
@@ -253,6 +281,7 @@
                 break;
             }
             case OpCode.getChildren: {
+                lastOp = "GETC";
                 GetChildrenRequest getChildrenRequest = new 
GetChildrenRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getChildrenRequest);
@@ -269,11 +298,12 @@
                         request.authInfo);
                 List<String> children = zks.dataTree.getChildren(
                         getChildrenRequest.getPath(), null, getChildrenRequest
-                                .getWatch() ? request.cnxn : null);
+                                .getWatch() ? cnxn : null);
                 rsp = new GetChildrenResponse(children);
                 break;
             }
             case OpCode.getChildren2: {
+                lastOp = "GETC";
                 GetChildren2Request getChildren2Request = new 
GetChildren2Request();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         getChildren2Request);
@@ -291,7 +321,7 @@
                         request.authInfo);
                 List<String> children = zks.dataTree.getChildren(
                         getChildren2Request.getPath(), stat, 
getChildren2Request
-                                .getWatch() ? request.cnxn : null);
+                                .getWatch() ? cnxn : null);
                 rsp = new GetChildren2Response(children, stat);
                 break;
             }
@@ -311,13 +341,19 @@
             LOG.error("Dumping request buffer: 0x" + sb.toString());
             err = Code.MARSHALLINGERROR;
         }
+
         ReplyHeader hdr =
             new ReplyHeader(request.cxid, request.zxid, err.intValue());
+
         zks.serverStats().updateLatency(request.createTime);
+        ((CnxnStats)cnxn.getStats())
+            .updateForResponse(request.cxid, request.zxid, lastOp,
+                    request.createTime, System.currentTimeMillis());
+
         try {
-            request.cnxn.sendResponse(hdr, rsp, "response");
+            cnxn.sendResponse(hdr, rsp, "response");
             if (closeSession) {
-                request.cnxn.sendCloseSession();
+                cnxn.sendCloseSession();
             }
         } catch (IOException e) {
             LOG.error("FIXMSG",e);

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
 Thu Nov 12 19:53:07 2009
@@ -32,13 +32,14 @@
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
@@ -334,8 +335,6 @@
 
     ArrayList<Id> authInfo = new ArrayList<Id>();
 
-    LinkedList<Request> outstanding = new LinkedList<Request>();
-
     /* Send close connection packet to the client, doIO will eventually
      * close the underlying machinery (like socket, selectorkey, etc...)
      */
@@ -388,7 +387,7 @@
         public EndOfStreamException(String msg) {
             super(msg);
         }
-        
+
         public String toString() {
             return "EndOfStreamException: " + getMessage();
         }
@@ -418,7 +417,7 @@
             incomingBuffer = lenBuffer;
         }
     }
-    
+
     void doIO(SelectionKey k) throws InterruptedException {
         try {
             if (sock == null) {
@@ -701,14 +700,14 @@
     }
 
     private void packetReceived() {
-        stats.packetsReceived++;
+        stats.incrPacketsReceived();
         if (zk != null) {
             zk.serverStats().incrementPacketsReceived();
         }
     }
 
     private void packetSent() {
-        stats.packetsSent++;
+        stats.incrPacketsSent();
         if (zk != null) {
             zk.serverStats().incrementPacketsSent();
         }
@@ -779,41 +778,48 @@
             }
             k.interestOps(SelectionKey.OP_WRITE);
             return true;
-        } else if (len == reqsCmd) {
-            LOG.info("Processing reqs command from "
+        } else if (len == statCmd || len == srvrCmd) {
+            LOG.info("Processing " + (len == statCmd ? "stat" : "srvr") 
+                    + " command from "
                     + sock.socket().getRemoteSocketAddress());
             packetReceived();
 
             StringBuffer sb = new StringBuffer();
-            sb.append("Requests:\n");
-            synchronized (outstanding) {
-                for (Request r : outstanding) {
-                    sb.append(r.toString());
-                    sb.append('\n');
+            if (zk != null){
+                sb.append("Zookeeper version: 
").append(Version.getFullVersion())
+                    .append("\n");
+                if (len == statCmd) {
+                    sb.append("Clients:\n");
+                    synchronized(factory.cnxns){
+                        for(NIOServerCnxn c : factory.cnxns){
+                            
sb.append(((CnxnStats)c.getStats()).toString(true));
+                        }
+                    }
+                    sb.append("\n");
                 }
+                sb.append(zk.serverStats().toString());
+                sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
+                    append("\n");
+            } else {
+                sb.append("ZooKeeperServer not running\n");
             }
+
             sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
             k.interestOps(SelectionKey.OP_WRITE);
             return true;
-        } else if (len == statCmd) {
-            LOG.info("Processing stat command from "
+        } else if (len == consCmd) {
+            LOG.info("Processing cons command from "
                     + sock.socket().getRemoteSocketAddress());
             packetReceived();
 
             StringBuffer sb = new StringBuffer();
-            if(zk != null){
-                sb.append("Zookeeper version: 
").append(Version.getFullVersion())
-                    .append("\n");
-                sb.append("Clients:\n");
+            if (zk != null){
                 synchronized(factory.cnxns){
                     for(NIOServerCnxn c : factory.cnxns){
-                        sb.append(c.getStats().toString());
+                        sb.append(((CnxnStats)c.getStats()).toString(false));
                     }
                 }
                 sb.append("\n");
-                sb.append(zk.serverStats().toString());
-                sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
-                    append("\n");
             } else {
                 sb.append("ZooKeeperServer not running\n");
             }
@@ -846,7 +852,21 @@
 
             zk.serverStats().reset();
 
-            sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
+            sendBuffer(ByteBuffer.wrap("Server stats reset.\n".getBytes()));
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == crstCmd) {
+            LOG.info("Processing crst command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            synchronized(factory.cnxns){
+                for(NIOServerCnxn c : factory.cnxns){
+                    c.getStats().reset();
+                }
+            }
+
+            sendBuffer(ByteBuffer.wrap("Connection stats 
reset.\n".getBytes()));
             k.interestOps(SelectionKey.OP_WRITE);
             return true;
         }
@@ -1146,13 +1166,77 @@
         return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
     }
 
-    private class CnxnStats implements ServerCnxn.Stats{
-        long packetsReceived;
-        long packetsSent;
+    class CnxnStats implements ServerCnxn.Stats {
+        private final Date established = new Date();
+
+        private final AtomicLong packetsReceived = new AtomicLong();
+        private final AtomicLong packetsSent = new AtomicLong();
+
+        private long minLatency;
+        private long maxLatency;
+        private String lastOp;
+        private long lastCxid;
+        private long lastZxid;
+        private long lastResponseTime;
+        private long lastLatency;
+
+        private long count;
+        private long totalLatency;
+
+        CnxnStats() {
+            reset();
+        }
+
+        public synchronized void reset() {
+            packetsReceived.set(0);
+            packetsSent.set(0);
+            minLatency = Long.MAX_VALUE;
+            maxLatency = 0;
+            lastOp = "NA";
+            lastCxid = -1;
+            lastZxid = -1;
+            lastResponseTime = 0;
+            lastLatency = 0;
+
+            count = 0;
+            totalLatency = 0;
+        }
+
+        long incrPacketsReceived() {
+            return packetsReceived.incrementAndGet();
+        }
+
+        long incrPacketsSent() {
+            return packetsSent.incrementAndGet();
+        }
+
+        synchronized void updateForResponse(long cxid, long zxid, String op,
+                long start, long end)
+        {
+            // don't overwrite with "special" xids - we're interested
+            // in the clients last real operation
+            if (cxid >= 0) {
+                lastCxid = cxid;
+            }
+            lastZxid = zxid;
+            lastOp = op;
+            lastResponseTime = end;
+            long elapsed = end - start;
+            lastLatency = elapsed;
+            if (elapsed < minLatency) {
+                minLatency = elapsed;
+            }
+            if (elapsed > maxLatency) {
+                maxLatency = elapsed;
+            }
+            count++;
+            totalLatency += elapsed;
+        }
+
+        public Date getEstablished() {
+            return established;
+        }
 
-        /**
-         * The number of requests that have been submitted but not yet 
responded to.
-         */
         public long getOutstandingRequests() {
             synchronized (NIOServerCnxn.this) {
                 synchronized (NIOServerCnxn.this.factory) {
@@ -1162,16 +1246,61 @@
         }
 
         public long getPacketsReceived() {
-            return packetsReceived;
+            return packetsReceived.longValue();
         }
 
         public long getPacketsSent() {
-            return packetsSent;
+            return packetsSent.longValue();
+        }
+
+        public synchronized long getMinLatency() {
+            return minLatency == Long.MAX_VALUE ? 0 : minLatency;
+        }
+
+        public synchronized long getAvgLatency() {
+            return count == 0 ? 0 : totalLatency / count;
+        }
+
+        public synchronized long getMaxLatency() {
+            return maxLatency;
+        }
+
+        public synchronized String getLastOperation() {
+            return lastOp;
+        }
+
+        public synchronized long getLastCxid() {
+            return lastCxid;
+        }
+
+        public synchronized long getLastZxid() {
+            return lastZxid;
+        }
+
+        public synchronized long getLastResponseTime() {
+            return lastResponseTime;
         }
 
+        public synchronized long getLastLatency() {
+            return lastLatency;
+        }
+
+        /** Prints brief stats information for the connection.
+         * 
+         * @see toString(boolean) for detailed stats
+         */
         @Override
-        public String toString(){
-            StringBuilder sb=new StringBuilder();
+        public String toString() {
+            return toString(false);
+        }
+
+        /**
+         * Print information about the connection.
+         * @param brief iff true prints brief details, otw full detail
+         * @return information about this connection
+         */
+        public String toString(boolean brief) {
+            StringBuilder sb = new StringBuilder();
             Channel channel = sk.channel();
             if (channel instanceof SocketChannel) {
                 sb.append(" ").append(((SocketChannel)channel).socket()
@@ -1179,15 +1308,37 @@
                   .append("[").append(Integer.toHexString(sk.interestOps()))
                   .append("](queued=").append(getOutstandingRequests())
                   .append(",recved=").append(getPacketsReceived())
-                  .append(",sent=").append(getPacketsSent()).append(")\n");
+                  .append(",sent=").append(getPacketsSent());
+
+                if (!brief) {
+                    long sessionId = getSessionId();
+                    if (sessionId != 0) {
+                        
sb.append(",sid=0x").append(Long.toHexString(sessionId))
+                            .append(",lop=").append(getLastOperation())
+                            .append(",est=").append(getEstablished().getTime())
+                            .append(",to=").append(getSessionTimeout());
+                        long lastCxid = getLastCxid();
+                        if (lastCxid >= 0) {
+                            sb.append(",lcxid=0x")
+                                .append(Long.toHexString(lastCxid));
+                        }
+                        sb.append(",lzxid=0x")
+                                .append(Long.toHexString(getLastZxid()))
+                            .append(",lresp=").append(getLastResponseTime())
+                            .append(",llat=").append(getLastLatency())
+                            .append(",minlat=").append(getMinLatency())
+                            .append(",avglat=").append(getAvgLatency())
+                            .append(",maxlat=").append(getMaxLatency());
+                    }
+                }
+                sb.append(")\n");
             }
             return sb.toString();
         }
     }
 
-    private CnxnStats stats=new CnxnStats();
+    private final CnxnStats stats = new CnxnStats();
     public Stats getStats() {
         return stats;
     }
-
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
 Thu Nov 12 19:53:07 2009
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Date;
 
 import org.apache.jute.Record;
 import org.apache.zookeeper.WatchedEvent;
@@ -29,8 +30,11 @@
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.ReplyHeader;
 
+/**
+ * Interface to a Server connection - represents a connection from a client
+ * to the server.
+ */
 public interface ServerCnxn extends Watcher {
-    
     /**
      * See <a 
href="{...@docroot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
      * Zk Admin</a>. this link is for all the commands.
@@ -38,21 +42,25 @@
     final static int ruokCmd = ByteBuffer.wrap("ruok".getBytes()).getInt();
 
     final static int dumpCmd = ByteBuffer.wrap("dump".getBytes()).getInt();
-    
+
     final static int statCmd = ByteBuffer.wrap("stat".getBytes()).getInt();
-    
-    final static int reqsCmd = ByteBuffer.wrap("reqs".getBytes()).getInt();
+
+    final static int srvrCmd = ByteBuffer.wrap("srvr".getBytes()).getInt();
+
+    final static int consCmd = ByteBuffer.wrap("cons".getBytes()).getInt();
 
     final static int setTraceMaskCmd = ByteBuffer.wrap("stmk".getBytes())
             .getInt();
-    
+
     final static int getTraceMaskCmd = ByteBuffer.wrap("gtmk".getBytes())
             .getInt();
-    
+
     final static int enviCmd = ByteBuffer.wrap("envi".getBytes()).getInt();
-    
+
     final static int srstCmd = ByteBuffer.wrap("srst".getBytes()).getInt();
-    
+
+    final static int crstCmd = ByteBuffer.wrap("crst".getBytes()).getInt();
+
     final static ByteBuffer imok = ByteBuffer.wrap("imok".getBytes());
 
     // This is just an arbitrary object to represent requests issued by
@@ -64,8 +72,8 @@
     void sendResponse(ReplyHeader h, Record r, String tag) throws IOException;
 
     /* notify the client the session is closing and close/cleanup socket */
-    void sendCloseSession(); 
-    
+    void sendCloseSession();
+
     void finishSessionInit(boolean valid);
 
     void process(WatchedEvent event);
@@ -77,12 +85,53 @@
     ArrayList<Id> getAuthInfo();
 
     InetSocketAddress getRemoteAddress();
-    
+
+    /**
+     * Statistics on the ServerCnxn
+     */
     interface Stats {
-        public long getOutstandingRequests();
-        public long getPacketsReceived();
-        public long getPacketsSent();
+        /** Date/time the connection was established
+         * @since 3.3.0 */
+        Date getEstablished();
+
+        /**
+         * The number of requests that have been submitted but not yet
+         * responded to.
+         */
+        long getOutstandingRequests();
+        /** Number of packets received */
+        long getPacketsReceived();
+        /** Number of packets sent (incl notifications) */
+        long getPacketsSent();
+        /** Min latency in ms
+         * @since 3.3.0 */
+        long getMinLatency();
+        /** Average latency in ms
+         * @since 3.3.0 */
+        long getAvgLatency();
+        /** Max latency in ms
+         * @since 3.3.0 */
+        long getMaxLatency();
+        /** Last operation performed by this connection
+         * @since 3.3.0 */
+        String getLastOperation();
+        /** Last cxid of this connection
+         * @since 3.3.0 */
+        long getLastCxid();
+        /** Last zxid of this connection
+         * @since 3.3.0 */
+        long getLastZxid();
+        /** Last time server sent a response to client on this connection
+         * @since 3.3.0 */
+        long getLastResponseTime();
+        /** Latency of last response to client on this connection in ms
+         * @since 3.3.0 */
+        long getLastLatency();
+
+        /** Reset counters
+         * @since 3.3.0 */
+        void reset();
     }
-    
+
     Stats getStats();
 }

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
 Thu Nov 12 19:53:07 2009
@@ -105,15 +105,16 @@
 
     @Override
     synchronized public String toString() {
-        StringBuffer sb = new StringBuffer("Session Sets ("
-                + sessionSets.size() + "):\n");
+        StringBuffer sb = new StringBuffer("Session Sets (")
+                .append(sessionSets.size()).append("):\n");
         ArrayList<Long> keys = new ArrayList<Long>(sessionSets.keySet());
         Collections.sort(keys);
         for (long time : keys) {
-            sb.append(sessionSets.get(time).sessions.size() + " expire at "
-                    + new Date(time) + ":\n");
+            sb.append(sessionSets.get(time).sessions.size())
+                .append(" expire at ").append(new Date(time)).append(":\n");
             for (SessionImpl s : sessionSets.get(time).sessions) {
-                sb.append("\t" + s.sessionId + "\n");
+                sb.append("\t0x").append(Long.toHexString(s.sessionId))
+                    .append("\n");
             }
         }
         return sb.toString();

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=835515&r1=835514&r2=835515&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
Thu Nov 12 19:53:07 2009
@@ -176,32 +176,44 @@
         return zk;
     }
 
-    public static boolean waitForServerUp(String hp, long timeout) {
-        long start = System.currentTimeMillis();
+    public static String send4LetterWord(String hp, String cmd)
+        throws IOException
+    {
         String split[] = hp.split(":");
         String host = split[0];
         int port = Integer.parseInt(split[1]);
+
+        Socket sock = new Socket(host, port);
+        BufferedReader reader = null;
+        try {
+            OutputStream outstream = sock.getOutputStream();
+            outstream.write(cmd.getBytes());
+            outstream.flush();
+
+            reader =
+                new BufferedReader(
+                        new InputStreamReader(sock.getInputStream()));
+            StringBuffer sb = new StringBuffer();
+            String line;
+            while((line = reader.readLine()) != null) {
+                sb.append(line + "\n");
+            }
+            return sb.toString();
+        } finally {
+            sock.close();
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    public static boolean waitForServerUp(String hp, long timeout) {
+        long start = System.currentTimeMillis();
         while (true) {
             try {
-                Socket sock = new Socket(host, port);
-                BufferedReader reader = null;
-                try {
-                    OutputStream outstream = sock.getOutputStream();
-                    outstream.write("stat".getBytes());
-                    outstream.flush();
-
-                    reader =
-                        new BufferedReader(
-                                new InputStreamReader(sock.getInputStream()));
-                    String line = reader.readLine();
-                    if (line != null && line.startsWith("Zookeeper version:")) 
{
-                        return true;
-                    }
-                } finally {
-                    sock.close();
-                    if (reader != null) {
-                        reader.close();
-                    }
+                String result = send4LetterWord(hp, "stat");
+                if (result.startsWith("Zookeeper version:")) {
+                    return true;
                 }
             } catch (IOException e) {
                 // ignore as this is expected
@@ -221,19 +233,9 @@
     }
     public static boolean waitForServerDown(String hp, long timeout) {
         long start = System.currentTimeMillis();
-        String split[] = hp.split(":");
-        String host = split[0];
-        int port = Integer.parseInt(split[1]);
         while (true) {
             try {
-                Socket sock = new Socket(host, port);
-                try {
-                    OutputStream outstream = sock.getOutputStream();
-                    outstream.write("stat".getBytes());
-                    outstream.flush();
-                } finally {
-                    sock.close();
-                }
+                send4LetterWord(hp, "stat");
             } catch (IOException e) {
                 return true;
             }

Added: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java?rev=835515&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java
 Thu Nov 12 19:53:07 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.junit.Test;
+
+public class FourLetterWordsQuorumTest extends QuorumBase {
+    protected static final Logger LOG =
+        Logger.getLogger(FourLetterWordsQuorumTest.class);
+
+    /** Test the various four letter words
+     * ruok,envi,stat,srvr,cons,dump,srst,crst */
+    @Test
+    public void testFourLetterWords() throws Exception {
+        String servers[] = hostPort.split(",");
+        for (String hp : servers) {
+            verify(hp, "ruok", "imok");
+            verify(hp, "envi", "java.version");
+            verify(hp, "stat", "Outstanding");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", "queued");
+            verify(hp, "dump", "Session");
+
+            verify(hp, "srst", "reset");
+            verify(hp, "crst", "reset");
+
+            verify(hp, "stat", "Outstanding");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", "queued");
+
+            TestableZooKeeper zk = createClient(hp);
+            String sid = "0x" + Long.toHexString(zk.getSessionId());
+
+            verify(hp, "stat", "queued");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", sid);
+            verify(hp, "dump", sid);
+
+            zk.getData("/", false, null);
+
+            verify(hp, "stat", "queued");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", sid);
+            verify(hp, "dump", sid);
+
+            zk.close();
+
+            verify(hp, "ruok", "imok");
+            verify(hp, "envi", "java.version");
+            verify(hp, "stat", "Outstanding");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", "queued");
+            verify(hp, "dump", "Session");
+
+            verify(hp, "srst", "reset");
+            verify(hp, "crst", "reset");
+
+            verify(hp, "stat", "Outstanding");
+            verify(hp, "srvr", "Outstanding");
+            verify(hp, "cons", "queued");
+        }
+    }
+
+    private void verify(String hp, String cmd, String expected)
+        throws IOException
+    {
+        String resp = send4LetterWord(hp, cmd);
+        LOG.info("cmd " + cmd + " expected " + expected + " got " + resp);
+        if (cmd.equals("dump")) {
+            assertTrue(resp.contains(expected)
+                    || resp.contains("Sessions with Ephemerals"));
+        } else {
+            assertTrue(resp.contains(expected));
+        }
+    }
+}

Added: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java?rev=835515&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java
 Thu Nov 12 19:53:07 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.junit.Test;
+
+public class FourLetterWordsTest extends ClientBase {
+    protected static final Logger LOG =
+        Logger.getLogger(FourLetterWordsTest.class);
+
+    /** Test the various four letter words
+     * ruok,envi,stat,srvr,cons,dump,srst,crst */
+    @Test
+    public void testFourLetterWords() throws Exception {
+        verify("ruok", "imok");
+        verify("envi", "java.version");
+        verify("stat", "Outstanding");
+        verify("srvr", "Outstanding");
+        verify("cons", "queued");
+        verify("dump", "Session");
+
+        verify("srst", "reset");
+        verify("crst", "reset");
+
+        verify("stat", "Outstanding");
+        verify("srvr", "Outstanding");
+        verify("cons", "queued");
+
+        TestableZooKeeper zk = createClient();
+        String sid = "0x" + Long.toHexString(zk.getSessionId());
+
+        verify("stat", "queued");
+        verify("srvr", "Outstanding");
+        verify("cons", sid);
+        verify("dump", sid);
+
+        zk.getData("/", false, null);
+
+        verify("stat", "queued");
+        verify("srvr", "Outstanding");
+        verify("cons", sid);
+        verify("dump", sid);
+
+        zk.close();
+
+        verify("ruok", "imok");
+        verify("envi", "java.version");
+        verify("stat", "Outstanding");
+        verify("srvr", "Outstanding");
+        verify("cons", "queued");
+        verify("dump", "Session");
+
+        verify("srst", "reset");
+        verify("crst", "reset");
+
+        verify("stat", "Outstanding");
+        verify("srvr", "Outstanding");
+        verify("cons", "queued");
+    }
+
+    private void verify(String cmd, String expected) throws IOException {
+        String resp = send4LetterWord(hostPort, cmd);
+        LOG.info("cmd " + cmd + " expected " + expected + " got " + resp);
+        assertTrue(resp.contains(expected));
+    }
+}


Reply via email to