Author: breed Date: Thu Nov 12 19:53:07 2009 New Revision: 835515 URL: http://svn.apache.org/viewvc?rev=835515&view=rev Log: ZOOKEEPER-566. "reqs" four letter word (command port) returns no information
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original) +++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Thu Nov 12 19:53:07 2009 @@ -930,29 +930,45 @@ composed of four letters. You issue the commands to ZooKeeper via telnet or nc, at the client port.</para> + <para>Three of the more interesting commands: "stat" gives some + general information about the server and connected clients, + while "srvr" and "cons" give extended details on server and + connections respectively.</para> + <variablelist> <varlistentry> - <term>dump</term> + <term>cons</term> <listitem> - <para>Lists the outstanding sessions and ephemeral nodes. This - only works on the leader.</para> + <para>List full connection/session details for all clients + connected to this server. Includes information on numbers + of packets received/sent, session id, operation latencies, + last operation performed, etc...</para> </listitem> </varlistentry> <varlistentry> - <term>envi</term> + <term>crst</term> <listitem> - <para>Print details about serving environment</para> + <para>Reset connection/session statistics for all connections.</para> </listitem> </varlistentry> <varlistentry> - <term>reqs</term> + <term>dump</term> <listitem> - <para>List outstanding requests</para> + <para>Lists the outstanding sessions and ephemeral nodes. This + only works on the leader.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>envi</term> + + <listitem> + <para>Print details about serving environment</para> </listitem> </varlistentry> @@ -970,7 +986,15 @@ <term>srst</term> <listitem> - <para>Reset statistics returned by stat command.</para> + <para>Reset server statistics.</para> + </listitem> + </varlistentry> + + <varlistentry> + <term>srvr</term> + + <listitem> + <para>Lists full details for the server.</para> </listitem> </varlistentry> @@ -978,7 +1002,7 @@ <term>stat</term> <listitem> - <para>Lists statistics about performance and connected + <para>Lists brief details for the server and connected clients.</para> </listitem> </varlistentry> Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionBean.java Thu Nov 12 19:53:07 2009 @@ -29,20 +29,23 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats; /** * Implementation of connection MBean interface. */ public class ConnectionBean implements ConnectionMXBean, ZKMBeanInfo { private static final Logger LOG = Logger.getLogger(ConnectionBean.class); - private ServerCnxn connection; - private ZooKeeperServer zk; - private Date timeCreated; - + + private final ServerCnxn connection; + private final CnxnStats stats; + + private final ZooKeeperServer zk; + public ConnectionBean(ServerCnxn connection,ZooKeeperServer zk){ - this.connection=connection; - this.zk=zk; - timeCreated=new Date(); + this.connection = connection; + this.stats = (CnxnStats)connection.getStats(); + this.zk = zk; } public String getSessionId() { @@ -80,7 +83,7 @@ } public String getStartedTime() { - return timeCreated.toString(); + return stats.getEstablished().toString(); } public void terminateSession() { @@ -96,6 +99,10 @@ connection.sendCloseSession(); } + public void resetCounters() { + stats.reset(); + } + @Override public String toString() { return "ConnectionBean{ClientIP=" + ObjectName.quote(getSourceIP()) @@ -103,19 +110,50 @@ } public long getOutstandingRequests() { - return connection.getStats().getOutstandingRequests(); + return stats.getOutstandingRequests(); } public long getPacketsReceived() { - return connection.getStats().getPacketsReceived(); + return stats.getPacketsReceived(); } public long getPacketsSent() { - return connection.getStats().getPacketsSent(); + return stats.getPacketsSent(); } public int getSessionTimeout() { return connection.getSessionTimeout(); } + public long getMinLatency() { + return stats.getMinLatency(); + } + + public long getAvgLatency() { + return stats.getAvgLatency(); + } + + public long getMaxLatency() { + return stats.getMaxLatency(); + } + + public String getLastOperation() { + return stats.getLastOperation(); + } + + public String getLastCxid() { + return "0x" + Long.toHexString(stats.getLastCxid()); + } + + public String getLastZxid() { + return "0x" + Long.toHexString(stats.getLastZxid()); + } + + public String getLastResponseTime() { + return new Date(stats.getLastResponseTime()).toString(); + } + + public long getLastLatency() { + return stats.getLastLatency(); + } } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ConnectionMXBean.java Thu Nov 12 19:53:07 2009 @@ -65,4 +65,34 @@ * reconnect with the same session id. */ public void terminateConnection(); + + + /** Min latency in ms + * @since 3.3.0 */ + long getMinLatency(); + /** Average latency in ms + * @since 3.3.0 */ + long getAvgLatency(); + /** Max latency in ms + * @since 3.3.0 */ + long getMaxLatency(); + /** Last operation performed by this connection + * @since 3.3.0 */ + String getLastOperation(); + /** Last cxid of this connection + * @since 3.3.0 */ + String getLastCxid(); + /** Last zxid of this connection + * @since 3.3.0 */ + String getLastZxid(); + /** Last time server sent a response to client on this connection + * @since 3.3.0 */ + String getLastResponseTime(); + /** Latency of last response to client on this connection in ms + * @since 3.3.0 */ + long getLastLatency(); + + /** Reset counters + * @since 3.3.0 */ + void resetCounters(); } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Nov 12 19:53:07 2009 @@ -35,10 +35,10 @@ import org.apache.zookeeper.proto.ExistsResponse; import org.apache.zookeeper.proto.GetACLRequest; import org.apache.zookeeper.proto.GetACLResponse; -import org.apache.zookeeper.proto.GetChildrenRequest; -import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetChildren2Request; import org.apache.zookeeper.proto.GetChildren2Response; +import org.apache.zookeeper.proto.GetChildrenRequest; +import org.apache.zookeeper.proto.GetChildrenResponse; import org.apache.zookeeper.proto.GetDataRequest; import org.apache.zookeeper.proto.GetDataResponse; import org.apache.zookeeper.proto.ReplyHeader; @@ -48,6 +48,7 @@ import org.apache.zookeeper.proto.SyncRequest; import org.apache.zookeeper.proto.SyncResponse; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; +import org.apache.zookeeper.server.NIOServerCnxn.CnxnStats; import org.apache.zookeeper.server.NIOServerCnxn.Factory; import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord; import org.apache.zookeeper.txn.CreateSessionTxn; @@ -136,6 +137,9 @@ if (request.cnxn == null) { return; } + ServerCnxn cnxn = request.cnxn; + + String lastOp = "NA"; zks.decInProcess(); Code err = Code.OK; Record rsp = null; @@ -145,7 +149,7 @@ throw KeeperException.create(KeeperException.Code.get(( (ErrorTxn) request.txn).getErr())); } - + KeeperException ke = request.getException(); if (ke != null) { throw ke; @@ -156,39 +160,59 @@ } switch (request.type) { case OpCode.ping: { - request.cnxn.sendResponse(new ReplyHeader(-2, + zks.serverStats().updateLatency(request.createTime); + + lastOp = "PING"; + ((CnxnStats)cnxn.getStats()) + .updateForResponse(request.cxid, request.zxid, lastOp, + request.createTime, System.currentTimeMillis()); + + cnxn.sendResponse(new ReplyHeader(-2, zks.dataTree.lastProcessedZxid, 0), null, "response"); return; } case OpCode.createSession: { - request.cnxn.finishSessionInit(true); + zks.serverStats().updateLatency(request.createTime); + + lastOp = "SESS"; + ((CnxnStats)cnxn.getStats()) + .updateForResponse(request.cxid, request.zxid, lastOp, + request.createTime, System.currentTimeMillis()); + + cnxn.finishSessionInit(true); return; } case OpCode.create: { + lastOp = "CREA"; rsp = new CreateResponse(rc.path); err = Code.get(rc.err); break; } case OpCode.delete: { + lastOp = "DELE"; err = Code.get(rc.err); break; } case OpCode.setData: { + lastOp = "SETD"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); break; } case OpCode.setACL: { + lastOp = "SETD"; rsp = new SetACLResponse(rc.stat); err = Code.get(rc.err); break; } case OpCode.closeSession: { + lastOp = "CLOS"; closeSession = true; err = Code.get(rc.err); break; } case OpCode.sync: { + lastOp = "SYNC"; SyncRequest syncRequest = new SyncRequest(); ZooKeeperServer.byteBuffer2Record(request.request, syncRequest); @@ -196,6 +220,7 @@ break; } case OpCode.exists: { + lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ZooKeeperServer.byteBuffer2Record(request.request, @@ -205,11 +230,12 @@ throw new KeeperException.BadArgumentsException(); } Stat stat = zks.dataTree.statNode(path, existsRequest - .getWatch() ? request.cnxn : null); + .getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); break; } case OpCode.getData: { + lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ZooKeeperServer.byteBuffer2Record(request.request, getDataRequest); @@ -226,11 +252,12 @@ request.authInfo); Stat stat = new Stat(); byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat, - getDataRequest.getWatch() ? request.cnxn : null); + getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } case OpCode.setWatches: { + lastOp = "SETW"; SetWatches setWatches = new SetWatches(); // XXX We really should NOT need this!!!! request.request.rewind(); @@ -239,10 +266,11 @@ zks.dataTree.setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), - setWatches.getChildWatches(), request.cnxn); + setWatches.getChildWatches(), cnxn); break; } case OpCode.getACL: { + lastOp = "GETA"; GetACLRequest getACLRequest = new GetACLRequest(); ZooKeeperServer.byteBuffer2Record(request.request, getACLRequest); @@ -253,6 +281,7 @@ break; } case OpCode.getChildren: { + lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ZooKeeperServer.byteBuffer2Record(request.request, getChildrenRequest); @@ -269,11 +298,12 @@ request.authInfo); List<String> children = zks.dataTree.getChildren( getChildrenRequest.getPath(), null, getChildrenRequest - .getWatch() ? request.cnxn : null); + .getWatch() ? cnxn : null); rsp = new GetChildrenResponse(children); break; } case OpCode.getChildren2: { + lastOp = "GETC"; GetChildren2Request getChildren2Request = new GetChildren2Request(); ZooKeeperServer.byteBuffer2Record(request.request, getChildren2Request); @@ -291,7 +321,7 @@ request.authInfo); List<String> children = zks.dataTree.getChildren( getChildren2Request.getPath(), stat, getChildren2Request - .getWatch() ? request.cnxn : null); + .getWatch() ? cnxn : null); rsp = new GetChildren2Response(children, stat); break; } @@ -311,13 +341,19 @@ LOG.error("Dumping request buffer: 0x" + sb.toString()); err = Code.MARSHALLINGERROR; } + ReplyHeader hdr = new ReplyHeader(request.cxid, request.zxid, err.intValue()); + zks.serverStats().updateLatency(request.createTime); + ((CnxnStats)cnxn.getStats()) + .updateForResponse(request.cxid, request.zxid, lastOp, + request.createTime, System.currentTimeMillis()); + try { - request.cnxn.sendResponse(hdr, rsp, "response"); + cnxn.sendResponse(hdr, rsp, "response"); if (closeSession) { - request.cnxn.sendCloseSession(); + cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG",e); Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Thu Nov 12 19:53:07 2009 @@ -32,13 +32,14 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; @@ -334,8 +335,6 @@ ArrayList<Id> authInfo = new ArrayList<Id>(); - LinkedList<Request> outstanding = new LinkedList<Request>(); - /* Send close connection packet to the client, doIO will eventually * close the underlying machinery (like socket, selectorkey, etc...) */ @@ -388,7 +387,7 @@ public EndOfStreamException(String msg) { super(msg); } - + public String toString() { return "EndOfStreamException: " + getMessage(); } @@ -418,7 +417,7 @@ incomingBuffer = lenBuffer; } } - + void doIO(SelectionKey k) throws InterruptedException { try { if (sock == null) { @@ -701,14 +700,14 @@ } private void packetReceived() { - stats.packetsReceived++; + stats.incrPacketsReceived(); if (zk != null) { zk.serverStats().incrementPacketsReceived(); } } private void packetSent() { - stats.packetsSent++; + stats.incrPacketsSent(); if (zk != null) { zk.serverStats().incrementPacketsSent(); } @@ -779,41 +778,48 @@ } k.interestOps(SelectionKey.OP_WRITE); return true; - } else if (len == reqsCmd) { - LOG.info("Processing reqs command from " + } else if (len == statCmd || len == srvrCmd) { + LOG.info("Processing " + (len == statCmd ? "stat" : "srvr") + + " command from " + sock.socket().getRemoteSocketAddress()); packetReceived(); StringBuffer sb = new StringBuffer(); - sb.append("Requests:\n"); - synchronized (outstanding) { - for (Request r : outstanding) { - sb.append(r.toString()); - sb.append('\n'); + if (zk != null){ + sb.append("Zookeeper version: ").append(Version.getFullVersion()) + .append("\n"); + if (len == statCmd) { + sb.append("Clients:\n"); + synchronized(factory.cnxns){ + for(NIOServerCnxn c : factory.cnxns){ + sb.append(((CnxnStats)c.getStats()).toString(true)); + } + } + sb.append("\n"); } + sb.append(zk.serverStats().toString()); + sb.append("Node count: ").append(zk.dataTree.getNodeCount()). + append("\n"); + } else { + sb.append("ZooKeeperServer not running\n"); } + sendBuffer(ByteBuffer.wrap(sb.toString().getBytes())); k.interestOps(SelectionKey.OP_WRITE); return true; - } else if (len == statCmd) { - LOG.info("Processing stat command from " + } else if (len == consCmd) { + LOG.info("Processing cons command from " + sock.socket().getRemoteSocketAddress()); packetReceived(); StringBuffer sb = new StringBuffer(); - if(zk != null){ - sb.append("Zookeeper version: ").append(Version.getFullVersion()) - .append("\n"); - sb.append("Clients:\n"); + if (zk != null){ synchronized(factory.cnxns){ for(NIOServerCnxn c : factory.cnxns){ - sb.append(c.getStats().toString()); + sb.append(((CnxnStats)c.getStats()).toString(false)); } } sb.append("\n"); - sb.append(zk.serverStats().toString()); - sb.append("Node count: ").append(zk.dataTree.getNodeCount()). - append("\n"); } else { sb.append("ZooKeeperServer not running\n"); } @@ -846,7 +852,21 @@ zk.serverStats().reset(); - sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes())); + sendBuffer(ByteBuffer.wrap("Server stats reset.\n".getBytes())); + k.interestOps(SelectionKey.OP_WRITE); + return true; + } else if (len == crstCmd) { + LOG.info("Processing crst command from " + + sock.socket().getRemoteSocketAddress()); + packetReceived(); + + synchronized(factory.cnxns){ + for(NIOServerCnxn c : factory.cnxns){ + c.getStats().reset(); + } + } + + sendBuffer(ByteBuffer.wrap("Connection stats reset.\n".getBytes())); k.interestOps(SelectionKey.OP_WRITE); return true; } @@ -1146,13 +1166,77 @@ return (InetSocketAddress) sock.socket().getRemoteSocketAddress(); } - private class CnxnStats implements ServerCnxn.Stats{ - long packetsReceived; - long packetsSent; + class CnxnStats implements ServerCnxn.Stats { + private final Date established = new Date(); + + private final AtomicLong packetsReceived = new AtomicLong(); + private final AtomicLong packetsSent = new AtomicLong(); + + private long minLatency; + private long maxLatency; + private String lastOp; + private long lastCxid; + private long lastZxid; + private long lastResponseTime; + private long lastLatency; + + private long count; + private long totalLatency; + + CnxnStats() { + reset(); + } + + public synchronized void reset() { + packetsReceived.set(0); + packetsSent.set(0); + minLatency = Long.MAX_VALUE; + maxLatency = 0; + lastOp = "NA"; + lastCxid = -1; + lastZxid = -1; + lastResponseTime = 0; + lastLatency = 0; + + count = 0; + totalLatency = 0; + } + + long incrPacketsReceived() { + return packetsReceived.incrementAndGet(); + } + + long incrPacketsSent() { + return packetsSent.incrementAndGet(); + } + + synchronized void updateForResponse(long cxid, long zxid, String op, + long start, long end) + { + // don't overwrite with "special" xids - we're interested + // in the clients last real operation + if (cxid >= 0) { + lastCxid = cxid; + } + lastZxid = zxid; + lastOp = op; + lastResponseTime = end; + long elapsed = end - start; + lastLatency = elapsed; + if (elapsed < minLatency) { + minLatency = elapsed; + } + if (elapsed > maxLatency) { + maxLatency = elapsed; + } + count++; + totalLatency += elapsed; + } + + public Date getEstablished() { + return established; + } - /** - * The number of requests that have been submitted but not yet responded to. - */ public long getOutstandingRequests() { synchronized (NIOServerCnxn.this) { synchronized (NIOServerCnxn.this.factory) { @@ -1162,16 +1246,61 @@ } public long getPacketsReceived() { - return packetsReceived; + return packetsReceived.longValue(); } public long getPacketsSent() { - return packetsSent; + return packetsSent.longValue(); + } + + public synchronized long getMinLatency() { + return minLatency == Long.MAX_VALUE ? 0 : minLatency; + } + + public synchronized long getAvgLatency() { + return count == 0 ? 0 : totalLatency / count; + } + + public synchronized long getMaxLatency() { + return maxLatency; + } + + public synchronized String getLastOperation() { + return lastOp; + } + + public synchronized long getLastCxid() { + return lastCxid; + } + + public synchronized long getLastZxid() { + return lastZxid; + } + + public synchronized long getLastResponseTime() { + return lastResponseTime; } + public synchronized long getLastLatency() { + return lastLatency; + } + + /** Prints brief stats information for the connection. + * + * @see toString(boolean) for detailed stats + */ @Override - public String toString(){ - StringBuilder sb=new StringBuilder(); + public String toString() { + return toString(false); + } + + /** + * Print information about the connection. + * @param brief iff true prints brief details, otw full detail + * @return information about this connection + */ + public String toString(boolean brief) { + StringBuilder sb = new StringBuilder(); Channel channel = sk.channel(); if (channel instanceof SocketChannel) { sb.append(" ").append(((SocketChannel)channel).socket() @@ -1179,15 +1308,37 @@ .append("[").append(Integer.toHexString(sk.interestOps())) .append("](queued=").append(getOutstandingRequests()) .append(",recved=").append(getPacketsReceived()) - .append(",sent=").append(getPacketsSent()).append(")\n"); + .append(",sent=").append(getPacketsSent()); + + if (!brief) { + long sessionId = getSessionId(); + if (sessionId != 0) { + sb.append(",sid=0x").append(Long.toHexString(sessionId)) + .append(",lop=").append(getLastOperation()) + .append(",est=").append(getEstablished().getTime()) + .append(",to=").append(getSessionTimeout()); + long lastCxid = getLastCxid(); + if (lastCxid >= 0) { + sb.append(",lcxid=0x") + .append(Long.toHexString(lastCxid)); + } + sb.append(",lzxid=0x") + .append(Long.toHexString(getLastZxid())) + .append(",lresp=").append(getLastResponseTime()) + .append(",llat=").append(getLastLatency()) + .append(",minlat=").append(getMinLatency()) + .append(",avglat=").append(getAvgLatency()) + .append(",maxlat=").append(getMaxLatency()); + } + } + sb.append(")\n"); } return sb.toString(); } } - private CnxnStats stats=new CnxnStats(); + private final CnxnStats stats = new CnxnStats(); public Stats getStats() { return stats; } - } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Thu Nov 12 19:53:07 2009 @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Date; import org.apache.jute.Record; import org.apache.zookeeper.WatchedEvent; @@ -29,8 +30,11 @@ import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.ReplyHeader; +/** + * Interface to a Server connection - represents a connection from a client + * to the server. + */ public interface ServerCnxn extends Watcher { - /** * See <a href="{...@docroot}/../../../docs/zookeeperAdmin.html#sc_zkCommands"> * Zk Admin</a>. this link is for all the commands. @@ -38,21 +42,25 @@ final static int ruokCmd = ByteBuffer.wrap("ruok".getBytes()).getInt(); final static int dumpCmd = ByteBuffer.wrap("dump".getBytes()).getInt(); - + final static int statCmd = ByteBuffer.wrap("stat".getBytes()).getInt(); - - final static int reqsCmd = ByteBuffer.wrap("reqs".getBytes()).getInt(); + + final static int srvrCmd = ByteBuffer.wrap("srvr".getBytes()).getInt(); + + final static int consCmd = ByteBuffer.wrap("cons".getBytes()).getInt(); final static int setTraceMaskCmd = ByteBuffer.wrap("stmk".getBytes()) .getInt(); - + final static int getTraceMaskCmd = ByteBuffer.wrap("gtmk".getBytes()) .getInt(); - + final static int enviCmd = ByteBuffer.wrap("envi".getBytes()).getInt(); - + final static int srstCmd = ByteBuffer.wrap("srst".getBytes()).getInt(); - + + final static int crstCmd = ByteBuffer.wrap("crst".getBytes()).getInt(); + final static ByteBuffer imok = ByteBuffer.wrap("imok".getBytes()); // This is just an arbitrary object to represent requests issued by @@ -64,8 +72,8 @@ void sendResponse(ReplyHeader h, Record r, String tag) throws IOException; /* notify the client the session is closing and close/cleanup socket */ - void sendCloseSession(); - + void sendCloseSession(); + void finishSessionInit(boolean valid); void process(WatchedEvent event); @@ -77,12 +85,53 @@ ArrayList<Id> getAuthInfo(); InetSocketAddress getRemoteAddress(); - + + /** + * Statistics on the ServerCnxn + */ interface Stats { - public long getOutstandingRequests(); - public long getPacketsReceived(); - public long getPacketsSent(); + /** Date/time the connection was established + * @since 3.3.0 */ + Date getEstablished(); + + /** + * The number of requests that have been submitted but not yet + * responded to. + */ + long getOutstandingRequests(); + /** Number of packets received */ + long getPacketsReceived(); + /** Number of packets sent (incl notifications) */ + long getPacketsSent(); + /** Min latency in ms + * @since 3.3.0 */ + long getMinLatency(); + /** Average latency in ms + * @since 3.3.0 */ + long getAvgLatency(); + /** Max latency in ms + * @since 3.3.0 */ + long getMaxLatency(); + /** Last operation performed by this connection + * @since 3.3.0 */ + String getLastOperation(); + /** Last cxid of this connection + * @since 3.3.0 */ + long getLastCxid(); + /** Last zxid of this connection + * @since 3.3.0 */ + long getLastZxid(); + /** Last time server sent a response to client on this connection + * @since 3.3.0 */ + long getLastResponseTime(); + /** Latency of last response to client on this connection in ms + * @since 3.3.0 */ + long getLastLatency(); + + /** Reset counters + * @since 3.3.0 */ + void reset(); } - + Stats getStats(); } Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java Thu Nov 12 19:53:07 2009 @@ -105,15 +105,16 @@ @Override synchronized public String toString() { - StringBuffer sb = new StringBuffer("Session Sets (" - + sessionSets.size() + "):\n"); + StringBuffer sb = new StringBuffer("Session Sets (") + .append(sessionSets.size()).append("):\n"); ArrayList<Long> keys = new ArrayList<Long>(sessionSets.keySet()); Collections.sort(keys); for (long time : keys) { - sb.append(sessionSets.get(time).sessions.size() + " expire at " - + new Date(time) + ":\n"); + sb.append(sessionSets.get(time).sessions.size()) + .append(" expire at ").append(new Date(time)).append(":\n"); for (SessionImpl s : sessionSets.get(time).sessions) { - sb.append("\t" + s.sessionId + "\n"); + sb.append("\t0x").append(Long.toHexString(s.sessionId)) + .append("\n"); } } return sb.toString(); Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=835515&r1=835514&r2=835515&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu Nov 12 19:53:07 2009 @@ -176,32 +176,44 @@ return zk; } - public static boolean waitForServerUp(String hp, long timeout) { - long start = System.currentTimeMillis(); + public static String send4LetterWord(String hp, String cmd) + throws IOException + { String split[] = hp.split(":"); String host = split[0]; int port = Integer.parseInt(split[1]); + + Socket sock = new Socket(host, port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write(cmd.getBytes()); + outstream.flush(); + + reader = + new BufferedReader( + new InputStreamReader(sock.getInputStream())); + StringBuffer sb = new StringBuffer(); + String line; + while((line = reader.readLine()) != null) { + sb.append(line + "\n"); + } + return sb.toString(); + } finally { + sock.close(); + if (reader != null) { + reader.close(); + } + } + } + + public static boolean waitForServerUp(String hp, long timeout) { + long start = System.currentTimeMillis(); while (true) { try { - Socket sock = new Socket(host, port); - BufferedReader reader = null; - try { - OutputStream outstream = sock.getOutputStream(); - outstream.write("stat".getBytes()); - outstream.flush(); - - reader = - new BufferedReader( - new InputStreamReader(sock.getInputStream())); - String line = reader.readLine(); - if (line != null && line.startsWith("Zookeeper version:")) { - return true; - } - } finally { - sock.close(); - if (reader != null) { - reader.close(); - } + String result = send4LetterWord(hp, "stat"); + if (result.startsWith("Zookeeper version:")) { + return true; } } catch (IOException e) { // ignore as this is expected @@ -221,19 +233,9 @@ } public static boolean waitForServerDown(String hp, long timeout) { long start = System.currentTimeMillis(); - String split[] = hp.split(":"); - String host = split[0]; - int port = Integer.parseInt(split[1]); while (true) { try { - Socket sock = new Socket(host, port); - try { - OutputStream outstream = sock.getOutputStream(); - outstream.write("stat".getBytes()); - outstream.flush(); - } finally { - sock.close(); - } + send4LetterWord(hp, "stat"); } catch (IOException e) { return true; } Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java?rev=835515&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsQuorumTest.java Thu Nov 12 19:53:07 2009 @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.TestableZooKeeper; +import org.junit.Test; + +public class FourLetterWordsQuorumTest extends QuorumBase { + protected static final Logger LOG = + Logger.getLogger(FourLetterWordsQuorumTest.class); + + /** Test the various four letter words + * ruok,envi,stat,srvr,cons,dump,srst,crst */ + @Test + public void testFourLetterWords() throws Exception { + String servers[] = hostPort.split(","); + for (String hp : servers) { + verify(hp, "ruok", "imok"); + verify(hp, "envi", "java.version"); + verify(hp, "stat", "Outstanding"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", "queued"); + verify(hp, "dump", "Session"); + + verify(hp, "srst", "reset"); + verify(hp, "crst", "reset"); + + verify(hp, "stat", "Outstanding"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", "queued"); + + TestableZooKeeper zk = createClient(hp); + String sid = "0x" + Long.toHexString(zk.getSessionId()); + + verify(hp, "stat", "queued"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", sid); + verify(hp, "dump", sid); + + zk.getData("/", false, null); + + verify(hp, "stat", "queued"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", sid); + verify(hp, "dump", sid); + + zk.close(); + + verify(hp, "ruok", "imok"); + verify(hp, "envi", "java.version"); + verify(hp, "stat", "Outstanding"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", "queued"); + verify(hp, "dump", "Session"); + + verify(hp, "srst", "reset"); + verify(hp, "crst", "reset"); + + verify(hp, "stat", "Outstanding"); + verify(hp, "srvr", "Outstanding"); + verify(hp, "cons", "queued"); + } + } + + private void verify(String hp, String cmd, String expected) + throws IOException + { + String resp = send4LetterWord(hp, cmd); + LOG.info("cmd " + cmd + " expected " + expected + " got " + resp); + if (cmd.equals("dump")) { + assertTrue(resp.contains(expected) + || resp.contains("Sessions with Ephemerals")); + } else { + assertTrue(resp.contains(expected)); + } + } +} Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java?rev=835515&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FourLetterWordsTest.java Thu Nov 12 19:53:07 2009 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.TestableZooKeeper; +import org.junit.Test; + +public class FourLetterWordsTest extends ClientBase { + protected static final Logger LOG = + Logger.getLogger(FourLetterWordsTest.class); + + /** Test the various four letter words + * ruok,envi,stat,srvr,cons,dump,srst,crst */ + @Test + public void testFourLetterWords() throws Exception { + verify("ruok", "imok"); + verify("envi", "java.version"); + verify("stat", "Outstanding"); + verify("srvr", "Outstanding"); + verify("cons", "queued"); + verify("dump", "Session"); + + verify("srst", "reset"); + verify("crst", "reset"); + + verify("stat", "Outstanding"); + verify("srvr", "Outstanding"); + verify("cons", "queued"); + + TestableZooKeeper zk = createClient(); + String sid = "0x" + Long.toHexString(zk.getSessionId()); + + verify("stat", "queued"); + verify("srvr", "Outstanding"); + verify("cons", sid); + verify("dump", sid); + + zk.getData("/", false, null); + + verify("stat", "queued"); + verify("srvr", "Outstanding"); + verify("cons", sid); + verify("dump", sid); + + zk.close(); + + verify("ruok", "imok"); + verify("envi", "java.version"); + verify("stat", "Outstanding"); + verify("srvr", "Outstanding"); + verify("cons", "queued"); + verify("dump", "Session"); + + verify("srst", "reset"); + verify("crst", "reset"); + + verify("stat", "Outstanding"); + verify("srvr", "Outstanding"); + verify("cons", "queued"); + } + + private void verify(String cmd, String expected) throws IOException { + String resp = send4LetterWord(hostPort, cmd); + LOG.info("cmd " + cmd + " expected " + expected + " got " + resp); + assertTrue(resp.contains(expected)); + } +}