Thanks for posting Robert!
On Nov 11, 2010, at 2:46 PM, Robert Crocombe wrote:
On Tue, Nov 9, 2010 at 12:34 PM, Jeremy Hanna
jeremy.hanna1...@gmail.comwrote:
Anyone know of a good blog post or docs anywhere that gives a simple
example of Watchers in action? I saw the one on:
http://hadoop.apache.org/zookeeper/docs/current/javaExample.html#ch_Introduction
but it seems kind of overly complicated for an intro to Watchers. I
appreciate the example but wondered if there were other examples out there.
Appended is a Java example of using a Watcher simply to wait for the client
to actually be connected to a server. I used it when I was confirming to my
satisfaction that there was a bug in the ZooKeeper recipe for WriteLock
awhile ago. I think this use is slightly unusual in that it is more
interested in KeeperState than the event type. A more conventional Watcher
might be like the following sketch (uhm, this is Groovy), though really
you'd have to look at both:
@Override
public void process(WatchedEvent event) {
switch (event?.getType()) {
case EventType.NodeDeleted:
// TODO: what should we do if the node being watched is itself
// deleted?
LOG.error(The node being watched ' + event.getPath + ' has been deleted:
that's not good)
break
case EventType.NodeChildrenChanged:
childrenChanged(event)
break
default:
LOG.debug(Ignoring event type ' + event?.getType() + ')
break
}
}
--
Robert Crocombe
package derp;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.recipes.lock.WriteLock;
public class Test {
private static final Log LOG = LogFactory.getLog(Test.class);
private static final String ZOO_CONFIG = 10.2.1.54:2181/test;
private static final String LOCK_DIR = /locking-test;
private static final int TIMEOUT_MILLIS = 1;
private static class ConnectWatcher implements Watcher {
private final Lock connectedLock = new ReentrantLock();
private final Condition connectedCondition = connectedLock.newCondition();
private final AtomicBoolean connected = new AtomicBoolean(false);
@Override
public void process(WatchedEvent event) {
LOG.debug(Event: + event);
KeeperState keeperState = event.getState();
switch (keeperState) {
case SyncConnected:
if (!connected.get()) {
connected.set(true);
signal();
}
break;
case Expired:
case Disconnected:
if (connected.get()) {
connected.set(false);
signal();
}
}
}
public void waitForConnection() throws InterruptedException {
connectedLock.lock();
try {
while (!connected.get()) {
LOG.debug(Waiting for condition to be signalled);
connectedCondition.await();
LOG.debug(Woken up on condition signalled);
}
} finally {
connectedLock.unlock();
}
LOG.debug(After signalling, we are connected);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder([);
b.append(connectedLock:).append(connectedLock);
b.append(,connectedCondition:).append(connectedCondition);
b.append(,connected:).append(connected);
b.append(]);
return b.toString();
}
private void signal() {
LOG.debug(Signaling after event);
connectedLock.lock();
try {
connectedCondition.signal();
} finally {
connectedLock.unlock();
}
}
}
private static final void fine(ZooKeeper lowerId, ZooKeeper higherId) throws
KeeperException,
InterruptedException {
WriteLock lower = new WriteLock(lowerId, LOCK_DIR, null);
WriteLock higher = new WriteLock(higherId, LOCK_DIR, null);
boolean lowerAcquired = lower.lock();
assert lowerAcquired;
LOG.debug(Lower acquired lock successfully, so higher should fail);
boolean higherAcquired = higher.lock();
assert !higherAcquired;
LOG.debug(Correct: higher session fails to acquire lock);
lower.unlock();
// Now that lower has unlocked, higher will acquire. Really should use
// the version of WriteLock with the LockListener, but a short sleep
// should do.
Thread.sleep(2000);
higher.unlock();
// make sure we let go.
assert !higher.isOwner();
}
/*
* Using recipes from ZooKeeper 3.2.1.
*
* This bug occurs because the sort in ZooKeeperLockOperation.execute
* (beginning @ line 221) orders the paths, but the paths contain the
* session ID (lines 206-207), so that sorting the paths places all paths
* with a lower session ID before those with a higher, i.e. the sorting is
* not just by sequence number. I think this is a bug.
*
* The result is that a lock acquisition by a WriteLock coming from a
* ZooKeeper with a high session ID