Author: breed Date: Fri Sep 5 13:30:49 2008 New Revision: 692534 URL: http://svn.apache.org/viewvc?rev=692534&view=rev Log: ZOOKEEPER-112 src/java/main ZooKeeper.java has test code embedded into it.
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=692534&r1=692533&r2=692534&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Fri Sep 5 13:30:49 2008 @@ -70,7 +70,7 @@ * connected to as needed. * */ -class ClientCnxn { +public class ClientCnxn { private static final Logger LOG = Logger.getLogger(ClientCnxn.class); private ArrayList<InetSocketAddress> serverAddrs = @@ -112,6 +112,8 @@ private final ZooKeeper zooKeeper; + private final Watcher watcher; + private long sessionId; private byte sessionPasswd[] = new byte[16]; @@ -203,9 +205,11 @@ } } - public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper) - throws IOException { - this(hosts, sessionTimeout, zooKeeper, 0, new byte[16]); + public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper, + Watcher watcher) + throws IOException + { + this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]); } /** @@ -222,8 +226,11 @@ * @throws IOException */ public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper, - long sessionId, byte[] sessionPasswd) throws IOException { + Watcher watcher, long sessionId, byte[] sessionPasswd) + throws IOException + { this.zooKeeper = zooKeeper; + this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; String hostsList[] = hosts.split(","); @@ -273,7 +280,7 @@ break; } if (event instanceof WatcherEvent) { - zooKeeper.processWatchEvent((WatcherEvent) event); + watcher.process((WatcherEvent) event); } else { Packet p = (Packet) event; int rc = 0; Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=692534&r1=692533&r2=692534&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Fri Sep 5 13:30:49 2008 @@ -114,77 +114,86 @@ new HashMap<String, Set<Watcher>>(); /** - * Process a WatchEvent. - * - * Looks up the watch in the set of watches, processes the event - * if found, otw uses the default watcher (registered during instance - * creation) to process the watch. - * - * @param event the event to process. - */ - public void processWatchEvent(WatcherEvent event) { - // clear the watches if we are not connected - if (event.getState() != Watcher.Event.KeeperStateSyncConnected) { - synchronized (dataWatches) { - for (Set<Watcher> watchers : dataWatches.values()) { - for (Watcher watcher : watchers) { - watcher.process(event); + * Process watch events generated by the ClientCnxn object. + * + * We are implementing this as a nested class of ZooKeeper so that + * the public Watcher.process(event) method will not be exposed as part + * of the ZooKeeper client API. + */ + private class ZKWatcher implements Watcher { + /** + * Process a WatchEvent. + * + * Looks up the watch in the set of watches, processes the event + * if found, otw uses the default watcher (registered during instance + * creation) to process the watch. + * + * @param event the event to process. + */ + public void process(WatcherEvent event) { + // clear the watches if we are not connected + if (event.getState() != Watcher.Event.KeeperStateSyncConnected) { + synchronized (dataWatches) { + for (Set<Watcher> watchers : dataWatches.values()) { + for (Watcher watcher : watchers) { + watcher.process(event); + } } + dataWatches.clear(); } - dataWatches.clear(); - } - synchronized (childWatches) { - for (Set<Watcher> watchers : childWatches.values()) { - for (Watcher watcher : watchers) { - watcher.process(event); + synchronized (childWatches) { + for (Set<Watcher> watchers : childWatches.values()) { + for (Watcher watcher : watchers) { + watcher.process(event); + } } + childWatches.clear(); } - childWatches.clear(); - } - } - - Set<Watcher> watchers = null; - - switch (event.getType()) { - case Watcher.Event.EventNone: - defaultWatcher.process(event); - return; - case Watcher.Event.EventNodeDataChanged: - case Watcher.Event.EventNodeCreated: - synchronized (dataWatches) { - watchers = dataWatches.remove(event.getPath()); - } - break; - case Watcher.Event.EventNodeChildrenChanged: - synchronized (childWatches) { - watchers = childWatches.remove(event.getPath()); } - break; - case Watcher.Event.EventNodeDeleted: - synchronized (dataWatches) { - watchers = dataWatches.remove(event.getPath()); - } - Set<Watcher> cwatches; - synchronized (childWatches) { - cwatches = childWatches.remove(event.getPath()); - } - if (cwatches != null) { - if (watchers == null) { - watchers = cwatches; - } else { - watchers.addAll(cwatches); + + Set<Watcher> watchers = null; + + switch (event.getType()) { + case Watcher.Event.EventNone: + defaultWatcher.process(event); + return; + case Watcher.Event.EventNodeDataChanged: + case Watcher.Event.EventNodeCreated: + synchronized (dataWatches) { + watchers = dataWatches.remove(event.getPath()); + } + break; + case Watcher.Event.EventNodeChildrenChanged: + synchronized (childWatches) { + watchers = childWatches.remove(event.getPath()); } + break; + case Watcher.Event.EventNodeDeleted: + synchronized (dataWatches) { + watchers = dataWatches.remove(event.getPath()); + } + Set<Watcher> cwatches; + synchronized (childWatches) { + cwatches = childWatches.remove(event.getPath()); + } + if (cwatches != null) { + if (watchers == null) { + watchers = cwatches; + } else { + watchers.addAll(cwatches); + } + } + break; + default: + String msg = "Unhandled watch event type " + event.getType(); + LOG.error(msg); + throw new RuntimeException(msg); } - break; - default: - String msg = "Unhandled watch event type " + event.getType(); - LOG.error(msg); - throw new RuntimeException(msg); - } - - if (watchers != null) { - for (Watcher watcher : watchers) { - watcher.process(event); + + if (watchers != null) { + for (Watcher watcher : watchers) { + watcher.process(event); + } } } } @@ -257,19 +266,19 @@ volatile States state; - ClientCnxn cnxn; + protected ClientCnxn cnxn; public ZooKeeper(String host, int sessionTimeout, Watcher watcher) throws IOException { this.defaultWatcher = watcher; - cnxn = new ClientCnxn(host, sessionTimeout, this); + cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher()); } public ZooKeeper(String host, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) throws IOException { this.defaultWatcher = watcher; - cnxn = new ClientCnxn(host, sessionTimeout, this, sessionId, - sessionPasswd); + cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(), + sessionId, sessionPasswd); } /** @@ -291,10 +300,6 @@ cnxn.addAuthInfo(scheme, auth); } - public String describeCNXN() { - return cnxn.toString(); - } - public synchronized void register(Watcher watcher) { this.defaultWatcher = watcher; } @@ -935,13 +940,4 @@ public States getState() { return state; } - - // Everything below this line is for testing! - - /** Testing only!!! Really this needs to be moved into a stub in the - * tests - pending JIRA for that. - */ - public void disconnect() throws IOException { - cnxn.disconnect(); - } } Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java?rev=692534&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java (added) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java Fri Sep 5 13:30:49 2008 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; + +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * Specialized form of ZooKeeper specific for testing. Typically provides + * the ability to do unsafe or incorrect operations that allow negative + * testing. + */ +public class DisconnectableZooKeeper extends ZooKeeper { + public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher) + throws IOException + { + super(host, sessionTimeout, watcher); + } + + public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher, + long sessionId, byte[] sessionPasswd) + throws IOException + { + super(host, sessionTimeout, watcher, sessionId, sessionPasswd); + } + + /** Testing only!!! Really!!!! This is only here to test when the client + * disconnects from the server w/o sending a session disconnect (ie + * ending the session cleanly). The server will eventually notice the + * client is no longer pinging and will timeout the session. + */ + public void disconnect() throws IOException { + cnxn.disconnect(); + } + +} Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=692534&r1=692533&r2=692534&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Fri Sep 5 13:30:49 2008 @@ -30,7 +30,6 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.CreateFlags; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; @@ -88,11 +87,12 @@ } } - private ZooKeeper createClient() + private DisconnectableZooKeeper createClient() throws IOException, InterruptedException { CountdownWatcher watcher = new CountdownWatcher(); - ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher); + DisconnectableZooKeeper zk = + new DisconnectableZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher); if(!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { @@ -143,7 +143,7 @@ public void testSession() throws IOException, InterruptedException, KeeperException { - ZooKeeper zk = createClient(); + DisconnectableZooKeeper zk = createClient(); zk.create("/e", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL); LOG.info("zk with session id 0x" + Long.toHexString(zk.getSessionId()) @@ -156,9 +156,9 @@ Stat stat = new Stat(); startSignal = new CountDownLatch(1); - zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this, - zk.getSessionId(), - zk.getSessionPasswd()); + zk = new DisconnectableZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this, + zk.getSessionId(), + zk.getSessionPasswd()); startSignal.await(); LOG.info("zk with session id 0x" + Long.toHexString(zk.getSessionId())