Added: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml?rev=703480&view=auto ============================================================================== --- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml (added) +++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml Fri Oct 10 06:58:03 2008 @@ -0,0 +1,674 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2002-2004 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE article PUBLIC "-//OASIS//DTD Simplified DocBook XML V1.0//EN" +"http://www.oasis-open.org/docbook/xml/simple/1.0/sdocbook.dtd"> +<article id="ar_Tutorial"> + <title>Programming with ZooKeeper - A basic tutorial</title> + + <articleinfo> + <legalnotice> + <para>Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. You may + obtain a copy of the License at <ulink + url="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0</ulink>.</para> + + <para>Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an "AS IS" + BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. See the License for the specific language governing permissions + and limitations under the License.</para> + </legalnotice> + + <abstract> + <para>This article contains sample Java code for simple implementations of barrier + and consumers queues..</para> + + </abstract> + </articleinfo> + + <section id="ch_Introduction"> + <title>Introduction</title> + + <para>In this tutorial, we show simple implementations of barriers and + producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue. + These examples assume that you have at least one ZooKeeper server running.</para> + + <para>Both primitives use the following common excerpt of code:</para> + + <programlisting> +static ZooKeeper zk = null; +static Integer mutex; + +String root; + +SyncPrimitive(String address) { + if(zk == null){ + try { + System.out.println("Starting ZK:"); + zk = new ZooKeeper(address, 3000, this); + mutex = new Integer(-1); + System.out.println("Finished starting ZK: " + zk); + } catch (KeeperException e) { + System.out.println("Keeper exception when starting new session: " + + e.toString()); + zk = null; + } catch (IOException e) { + System.out.println(e.toString()); + zk = null; + } + } +} + +synchronized public void process(WatcherEvent event) { + synchronized (mutex) { + mutex.notify(); + } +} + +</programlisting> + +<para>Both classes extend SyncPrimitive. In this way, we execute steps that are +common to all primitives in the constructor of SyncPrimitive. To keep the examples +simple, we create a ZooKeeper object the first time we instantiate either a barrier +object or a queue object, and we declare a static variable that is a reference +to this object. The subsequent instances of Barrier and Queue check whether a +ZooKeeper object exists. Alternatively, we could have the application creating a +ZooKeeper object and passing it to the constructor of Barrier and Queue.</para> +<para> +We use the process() method to process notifications triggered due to watches. +In the following discussion, we present code that sets watches. A watch is internal +structure that enables ZooKeeper to notify a client of a change to a node. For example, +if a client is waiting for other clients to leave a barrier, then it can set a watch and +wait for modifications to a particular node, which can indicate that it is the end of the wait. +This point becomes clear once we go over the examples. +</para> +</section> + + <section id="sc_barriers"><title>Barriers</title> + + <para> + A barrier is a primitive that enables a group of processes to synchronize the + beginning and the end of a computation. The general idea of this implementation + is to have a barrier node that serves the purpose of being a parent for individual + process nodes. Suppose that we call the barrier node "/b1". Each process "p" then + creates a node "/b1/p". Once enough processes have created their corresponding + nodes, joined processes can start the computation. + </para> + + <para>In this example, each process instantiates a Barrier object, and its constructor takes as parameters:</para> + + <itemizedlist><listitem><para>the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")</para></listitem> +<listitem><para>the path of the barrier node on ZooKeeper (e.g., "/b1")</para></listitem> +<listitem><para>the size of the group of processes</para></listitem> +</itemizedlist> + +<para>The constructor of Barrier passes the address of the Zookeeper server to the +constructor of the parent class. The parent class creates a ZooKeeper instance if +one does not exist. The constructor of Barrier then creates a +barrier node on ZooKeeper, which is the parent node of all process nodes, and +we call root (<emphasis role="bold">Note:</emphasis> This is not the ZooKeeper root "/").</para> + +<programlisting> + /** + * Barrier constructor + * + * @param address + * @param name + * @param size + */ +Barrier(String address, String name, int size) { + super(address); + this.root = name; + this.size = size; + + // Create barrier node + if (zk != null) { + try { + Stat s = zk.exists(root, false); + if (s == null) { + zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + } + } catch (KeeperException e) { + System.out.println("Keeper exception when instantiating queue: " + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); + } + } + + // My node name + try { + name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); + } catch (UnknownHostException e) { + System.out.println(e.toString()); + } + +} +</programlisting> +<para> +To enter the barrier, a process calls enter(). The process creates a node under +the root to represent it, using its host name to form the node name. It then wait +until enough processes have entered the barrier. A process does it by checking +the number of children the root node has with "getChildren()", and waiting for +notifications in the case it does not have enough. To receive a notification when +there is a change to the root node, a process has to set a watch, and does it +through the call to "getChildren()". In the code, we have that "getChildren()" +has two parameters. The first one states the node to read from, and the second is +a boolean flag that enables the process to set a watch. In the code the flag is true. +</para> + +<programlisting> + /** + * Join barrier + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ + +boolean enter() throws KeeperException, InterruptedException{ + zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateFlags.EPHEMERAL); + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + + if (list.size() < size) { + mutex.wait(); + } else { + return true; + } + } + } +} +</programlisting> +<para> +Note that enter() throws both KeeperException and InterruptedException, so it is +the reponsability of the application to catch and handle such exceptions.</para> + +<para> +Once the computation is finished, a process calls leave() to leave the barrier. +First it deletes its corresponding node, and then it gets the children of the root +node. If there is at least one child, then it waits for a notification (obs: note +that the second parameter of the call to getChildren() is true, meaning that +ZooKeeper has to set a watch on the the root node). Upon reception of a notification, +it checks once more whether the root node has any child.</para> + +<programlisting> + /** + * Wait until all reach barrier + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ + +boolean leave() throws KeeperException, InterruptedException{ + zk.delete(root + "/" + name, 0); + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + if (list.size() > 0) { + mutex.wait(); + } else { + return true; + } + } + } +} +</programlisting> +</section> +<section id="sc_producerConsumerQueues"><title>Producer-Consumer Queues</title> +<para> +A producer-consumer queue is a distributed data estructure thata group of processes +use to generate and consume items. Producer processes create new elements and add +them to the queue. Consumer processes remove elements from the list, and process them. +In this implementation, the elements are simple integers. The queue is represented +by a root node, and to add an element to the queue, a producer process creates a new node, +a child of the root node. +</para> + +<para> +The following excerpt of code corresponds to the constructor of the object. As +with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, +that creates a ZooKeeper object if one doesn't exist. It then verifies if the root +node of the queue exists, and creates if it doesn't. +</para> +<programlisting> +/** + * Constructor of producer-consumer queue + * + * @param address + * @param name + */ +Queue(String address, String name) { + super(address); + this.root = name; + // Create ZK node name + if (zk != null) { + try { + Stat s = zk.exists(root, false); + if (s == null) { + zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + } + } catch (KeeperException e) { + System.out + .println("Keeper exception when instantiating queue: " + + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); + } + } +} + </programlisting> + +<para> +A producer process calls "produce()" to add an element to the queue, and passes +an integer as an argument. To add an element to the queue, the method creates a +new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to +append the value of the sequencer counter associated to the root node. In this way, +we impose a total order on the elements of the queue, thus guaranteeing that the +oldest element of the queue is the next one consumed. +</para> + +<programlisting> +/** + * Add element to the queue. + * + * @param i + * @return + */ + +boolean produce(int i) throws KeeperException, InterruptedException{ + ByteBuffer b = ByteBuffer.allocate(4); + byte[] value; + + // Add child with value i + b.putInt(i); + value = b.array(); + zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, + CreateFlags.SEQUENCE); + + return true; +} +</programlisting> +<para> +To consume an element, a consumer process obtains the children of the root node, +reads the node with smallest counter value, and returns the element. Note that +if there is a conflict, then one of the two contending processes won't be able to +delete the node and the delete operation will throw an exception.</para> + +<para> +A call to getChildren() returns the list of children in lexicographic order. +As lexicographic order does not necessary follow the numerical order of the counter +values, we need to decide which element is the smallest. To decide which one has +the smallest counter value, we traverse the list, and remove the prefix "element" +from each one.</para> + +<programlisting> +/** + * Remove first element from the queue. + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ +int consume() throws KeeperException, InterruptedException{ + int retvalue = -1; + Stat stat = null; + + // Get the first element available + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + if (list.size() == 0) { + System.out.println("Going to wait"); + mutex.wait(); + } else { + Integer min = new Integer(list.get(0).substring(7)); + for(String s : list){ + Integer tempValue = new Integer(s.substring(7)); + if(tempValue > min) min = tempValue; + } + System.out.println("Temporary value: " + root + "/element" + min); + byte[] b = zk.getData(root + "/element" + min, false, stat); + zk.delete(root + "/element" + min, 0); + ByteBuffer buffer = ByteBuffer.wrap(b); + retvalue = buffer.getInt(); + + return retvalue; + } + } + } +} +</programlisting> + +</section> +<section id="sc_sourceListing"><title>Complete Source Listing</title> +<example id="eg_SyncPrimitive_java"> +<title>SyncPrimitive.Java</title> +<programlisting> +package com.yahoo.SyncPrimitive; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.lang.InterruptedException; +import java.util.ArrayList; +import java.util.Random; + +import com.yahoo.zookeeper.Watcher; +import com.yahoo.zookeeper.data.Stat; +import com.yahoo.zookeeper.proto.WatcherEvent; +import com.yahoo.zookeeper.KeeperException; +import com.yahoo.zookeeper.ZooDefs.Ids; +import com.yahoo.zookeeper.ZooDefs.CreateFlags; +import com.yahoo.zookeeper.ZooKeeper; + +public class SyncPrimitive implements Watcher { + + static ZooKeeper zk = null; + static Integer mutex; + + String root; + + SyncPrimitive(String address) { + if(zk == null){ + try { + System.out.println("Starting ZK:"); + zk = new ZooKeeper(address, 3000, this); + mutex = new Integer(-1); + System.out.println("Finished starting ZK: " + zk); + } catch (KeeperException e) { + System.out.println("Keeper exception when starting new session: " + + e.toString()); + zk = null; + } catch (IOException e) { + System.out.println(e.toString()); + zk = null; + } + } + //else mutex = new Integer(-1); + } + + synchronized public void process(WatcherEvent event) { + synchronized (mutex) { + //System.out.println("Process: " + event.getType()); + mutex.notify(); + } + } + + /** + * Barrier + */ + static public class Barrier extends SyncPrimitive { + int size; + String name; + + /** + * Barrier constructor + * + * @param address + * @param name + * @param size + */ + Barrier(String address, String name, int size) { + super(address); + this.root = name; + this.size = size; + + // Create barrier node + if (zk != null) { + try { + Stat s = zk.exists(root, false); + if (s == null) { + zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + } + } catch (KeeperException e) { + System.out + .println("Keeper exception when instantiating queue: " + + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); + } + } + + // My node name + try { + name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); + } catch (UnknownHostException e) { + System.out.println(e.toString()); + } + + } + + /** + * Join barrier + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ + + boolean enter() throws KeeperException, InterruptedException{ + zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateFlags.EPHEMERAL); + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + + if (list.size() < size) { + mutex.wait(); + } else { + return true; + } + } + } + } + + /** + * Wait until all reach barrier + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ + + boolean leave() throws KeeperException, InterruptedException{ + zk.delete(root + "/" + name, 0); + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + if (list.size() > 0) { + mutex.wait(); + } else { + return true; + } + } + } + } + } + + /** + * Producer-Consumer queue + */ + static public class Queue extends SyncPrimitive { + + /** + * Constructor of producer-consumer queue + * + * @param address + * @param name + */ + Queue(String address, String name) { + super(address); + this.root = name; + // Create ZK node name + if (zk != null) { + try { + Stat s = zk.exists(root, false); + if (s == null) { + zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + } + } catch (KeeperException e) { + System.out + .println("Keeper exception when instantiating queue: " + + e.toString()); + } catch (InterruptedException e) { + System.out.println("Interrupted exception"); + } + } + } + + /** + * Add element to the queue. + * + * @param i + * @return + */ + + boolean produce(int i) throws KeeperException, InterruptedException{ + ByteBuffer b = ByteBuffer.allocate(4); + byte[] value; + + // Add child with value i + b.putInt(i); + value = b.array(); + zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, + CreateFlags.SEQUENCE); + + return true; + } + + + /** + * Remove first element from the queue. + * + * @return + * @throws KeeperException + * @throws InterruptedException + */ + int consume() throws KeeperException, InterruptedException{ + int retvalue = -1; + Stat stat = null; + + // Get the first element available + while (true) { + synchronized (mutex) { + ArrayList<String> list = zk.getChildren(root, true); + if (list.size() == 0) { + System.out.println("Going to wait"); + mutex.wait(); + } else { + Integer min = new Integer(list.get(0).substring(7)); + for(String s : list){ + Integer tempValue = new Integer(s.substring(7)); + //System.out.println("Temporary value: " + tempValue); + if(tempValue < min) min = tempValue; + } + System.out.println("Temporary value: " + root + "/element" + min); + byte[] b = zk.getData(root + "/element" + min, + false, stat); + zk.delete(root + "/element" + min, 0); + ByteBuffer buffer = ByteBuffer.wrap(b); + retvalue = buffer.getInt(); + + return retvalue; + } + } + } + } + } + + public static void main(String args[]) { + if (args[0].equals("qTest")) + queueTest(args); + else + barrierTest(args); + + } + + public static void queueTest(String args[]) { + Queue q = new Queue(args[1], "/app1"); + + System.out.println("Input: " + args[1]); + int i; + Integer max = new Integer(args[2]); + + if (args[3].equals("p")) { + System.out.println("Producer"); + for (i = 0; i < max; i++) + try{ + q.produce(10 + i); + } catch (KeeperException e){ + + } catch (InterruptedException e){ + + } + } else { + System.out.println("Consumer"); + + for (i = 0; i < max; i++) { + try{ + int r = q.consume(); + System.out.println("Item: " + r); + } catch (KeeperException e){ + i--; + } catch (InterruptedException e){ + + } + } + } + } + + public static void barrierTest(String args[]) { + Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); + try{ + boolean flag = b.enter(); + System.out.println("Entered barrier: " + args[2]); + if(!flag) System.out.println("Error when entering the barrier"); + } catch (KeeperException e){ + + } catch (InterruptedException e){ + + } + + // Generate random integer + Random rand = new Random(); + int r = rand.nextInt(100); + // Loop for rand iterations + for (int i = 0; i < r; i++) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + + } + } + try{ + b.leave(); + } catch (KeeperException e){ + + } catch (InterruptedException e){ + + } + System.out.println("Left barrier"); + } +}</programlisting></example> +</section> + +</article>
Propchange: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png?rev=703480&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/zookeeper/trunk/src/docs/src/documentation/resources/images/2pc.png ------------------------------------------------------------------------------ svn:mime-type = image/png Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml?rev=703480&r1=703479&r2=703480&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml (original) +++ hadoop/zookeeper/trunk/src/docs/src/documentation/skinconf.xml Fri Oct 10 06:58:03 2008 @@ -142,10 +142,20 @@ font-family: monospace; } + pre.code { + margin-left: 0em; + padding: 0.5em; + background-color: #f0f0f0; + font-family: monospace; + } + +<!-- patricks .code { font-family: "Courier New", Courier, monospace; font-size: 110%; } +--> + </extra-css> <colors>