Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc Thu Nov 
12 21:27:47 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <cppunit/TestRunner.h>
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <stdexcept>
+#include <cppunit/Exception.h>
+#include <cppunit/TestFailure.h>
+#include <cppunit/XmlOutputter.h>
+#include <fstream>
+
+#include "Util.h"
+
+using namespace std;
+
+CPPUNIT_NS_BEGIN
+
+class EclipseOutputter: public CompilerOutputter
+{
+public:
+  EclipseOutputter(TestResultCollector *result,ostream &stream):
+        CompilerOutputter(result,stream,"%p:%l: "),stream_(stream)
+    {
+    }
+    virtual void printFailedTestName( TestFailure *failure ){}
+    virtual void printFailureMessage( TestFailure *failure )
+    {
+      stream_<<": ";
+      Message msg = failure->thrownException()->message();
+      stream_<< msg.shortDescription();
+
+      string text;
+      for(int i=0; i<msg.detailCount();i++){
+          text+=msg.detailAt(i);
+          if(i+1!=msg.detailCount())
+              text+=", ";
+      }
+      if(text.length()!=0)
+          stream_ <<" ["<<text<<"]";
+      stream_<<"\n";
+    }
+    ostream& stream_;
+};
+
+CPPUNIT_NS_END
+
+int main( int argc, char* argv[] ) { 
+   // if command line contains "-ide" then this is the post build check
+   // => the output must be in the compiler error format.
+   //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]);
+   globalTestConfig.addConfigFromCmdLine(argc,argv);
+
+   // Create the event manager and test controller
+   CPPUNIT_NS::TestResult controller;
+   // Add a listener that colllects test result
+   CPPUNIT_NS::TestResultCollector result;
+   controller.addListener( &result );
+   
+   // Add a listener that print dots as tests run.
+   // CPPUNIT_NS::TextTestProgressListener progress;
+   CPPUNIT_NS::BriefTestProgressListener progress;
+   controller.addListener( &progress );
+ 
+   CPPUNIT_NS::TestRunner runner;
+   runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() );
+ 
+   try
+   {
+     cout << "Running "  <<  globalTestConfig.getTestName();
+     runner.run( controller, globalTestConfig.getTestName());
+     cout<<endl;
+
+     // Print test in a compiler compatible format.
+     CPPUNIT_NS::EclipseOutputter outputter( &result,cout);
+     outputter.write(); 
+
+ // Uncomment this for XML output
+#ifdef ENABLE_XML_OUTPUT
+     std::ofstream file( "tests.xml" );
+     CPPUNIT_NS::XmlOutputter xml( &result, file );
+     xml.setStyleSheet( "report.xsl" );
+     xml.write();
+     file.close();
+#endif
+   }
+   catch ( std::invalid_argument &e )  // Test path not resolved
+   {
+     cout<<"\nERROR: "<<e.what()<<endl;
+     return 0;
+   }
+
+   return result.wasSuccessful() ? 0 : 1;
+ }

Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc Thu Nov 12 
21:27:47 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Util.h"
+
+const std::string EMPTY_STRING;
+
+TestConfig globalTestConfig;
+
+void millisleep(int ms){
+    timespec ts;
+    ts.tv_sec=ms/1000;
+    ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds
+    nanosleep(&ts,0);
+}

Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h Thu Nov 12 
21:27:47 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UTIL_H_
+#define UTIL_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+// number of elements in array
+#define COUNTOF(array) sizeof(array)/sizeof(array[0])
+
+#define DECLARE_WRAPPER(ret,sym,sig) \
+    extern "C" ret __real_##sym sig; \
+    extern "C" ret __wrap_##sym sig
+
+#define CALL_REAL(sym,params) \
+    __real_##sym params
+
+// must include "src/zookeeper_log.h" to be able to use this macro
+#define TEST_TRACE(x) \
+    log_message(3,__LINE__,__func__,format_log_message x)
+
+extern const std::string EMPTY_STRING;
+
+// 
*****************************************************************************
+// A bit of wizardry to get to the bare type from a reference or a pointer 
+// to the type
+template <class T>
+struct TypeOp {
+    typedef T BareT;
+    typedef T ArgT;
+};
+
+// partial specialization for reference types
+template <class T>
+struct TypeOp<T&>{
+    typedef T& ArgT;
+    typedef typename TypeOp<T>::BareT BareT;
+};
+
+// partial specialization for pointers
+template <class T>
+struct TypeOp<T*>{
+    typedef T* ArgT;
+    typedef typename TypeOp<T>::BareT BareT;
+};
+
+// 
*****************************************************************************
+// Container utilities
+
+template <class K, class V>
+void putValue(std::map<K,V>& map,const K& k, const V& v){
+    typedef std::map<K,V> Map;
+    typename Map::const_iterator it=map.find(k);
+    if(it==map.end())
+        map.insert(typename Map::value_type(k,v));
+    else
+        map[k]=v;
+}
+
+template <class K, class V>
+bool getValue(const std::map<K,V>& map,const K& k,V& v){
+    typedef std::map<K,V> Map;
+    typename Map::const_iterator it=map.find(k);
+    if(it==map.end())
+        return false;
+    v=it->second;
+    return true;
+}
+
+// 
*****************************************************************************
+// misc utils
+
+// millisecond sleep
+void millisleep(int ms);
+// evaluate given predicate until it returns true or the timeout 
+// (in millis) has expired
+template<class Predicate>
+int ensureCondition(const Predicate& p,int timeout){
+    int elapsed=0;
+    while(!p() && elapsed<timeout){
+        millisleep(2);
+        elapsed+=2;
+    }
+    return elapsed;
+};
+
+// 
*****************************************************************************
+// test global configuration data 
+class TestConfig{
+    typedef std::vector<std::string> CmdLineOptList;
+public:
+    typedef CmdLineOptList::const_iterator const_iterator;
+    TestConfig(){}
+    ~TestConfig(){}
+    void addConfigFromCmdLine(int argc, char* argv[]){
+        if(argc>=2)
+            testName_=argv[1];
+        for(int i=2; i<argc;++i)
+            cmdOpts_.push_back(argv[i]);
+    }
+    const_iterator getExtraOptBegin() const {return cmdOpts_.begin();}
+    const_iterator getExtraOptEnd() const {return cmdOpts_.end();}
+    size_t getExtraOptCount() const {
+        return cmdOpts_.size();
+    }
+    const std::string& getTestName() const {
+        return testName_=="all"?EMPTY_STRING:testName_;
+    }
+private:
+    CmdLineOptList cmdOpts_;
+    std::string testName_;
+};
+
+extern TestConfig globalTestConfig;
+
+#endif /*UTIL_H_*/

Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh Thu Nov 12 
21:27:47 2009
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$1" == "x" ]
+then
+       echo "USAGE: $0 startClean|start|stop hostPorts"
+       exit 2
+fi
+
+if [ "x$1" == "xstartClean" ]
+then
+       rm -rf /tmp/zkdata
+fi
+
+# Make sure nothing is left over from before
+if [ -r "/tmp/zk.pid" ]
+then
+pid=`cat /tmp/zk.pid`
+kill -9 $pid
+rm -f /tmp/zk.pid
+fi
+
+base_dir="../../../../.."
+
+CLASSPATH="$CLASSPATH:${base_dir}/build/classes"
+CLASSPATH="$CLASSPATH:${base_dir}/conf"
+
+for f in "${base_dir}"/zookeeper-*.jar
+do
+    CLASSPATH="$CLASSPATH:$f"
+done
+
+for i in "${base_dir}"/build/lib/*.jar
+do
+    CLASSPATH="$CLASSPATH:$i"
+done
+
+for i in "${base_dir}"/src/java/lib/*.jar
+do
+    CLASSPATH="$CLASSPATH:$i"
+done
+
+CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar"
+
+case $1 in
+start|startClean)
+       mkdir -p /tmp/zkdata
+       java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 
22181 /tmp/zkdata &> /tmp/zk.log &
+        echo $! > /tmp/zk.pid
+        sleep 5
+       ;;
+stop)
+       # Already killed above
+       ;;
+*)
+       echo "Unknown command " + $1
+       exit 2
+esac
+

Propchange: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: 
hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java?rev=835560&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
 Thu Nov 12 21:27:47 2009
@@ -0,0 +1,312 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * 
+ * A <a href="package.html">protocol to implement a distributed queue</a>.
+ * 
+ */
+
+public class DistributedQueue {
+    private static final Logger LOG = Logger.getLogger(DistributedQueue.class);
+
+    private final String dir;
+
+    private ZooKeeper zookeeper;
+    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private final String prefix = "qn-";
+
+
+    public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
+        this.dir = dir;
+
+        if(acl != null){
+            this.acl = acl;
+        }
+        this.zookeeper = zookeeper;
+
+    }
+
+
+
+    /**
+     * Returns a Map of the children, ordered by id.
+     * @param watcher optional watcher on getChildren() operation.
+     * @return map from id to child name for all children
+     */
+    private TreeMap<Long,String> orderedChildren(Watcher watcher) throws 
KeeperException, InterruptedException {
+        TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
+
+        List<String> childNames = null;
+        try{
+            childNames = zookeeper.getChildren(dir, watcher);
+        }catch (KeeperException.NoNodeException e){
+            throw e;
+        }
+
+        for(String childName : childNames){
+            try{
+                //Check format
+                if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+                    LOG.warn("Found child node with improper name: " + 
childName);
+                    continue;
+                }
+                String suffix = childName.substring(prefix.length());
+                Long childId = new Long(suffix);
+                orderedChildren.put(childId,childName);
+            }catch(NumberFormatException e){
+                LOG.warn("Found child node with improper format : " + 
childName + " " + e,e);
+            }
+        }
+
+        return orderedChildren;
+    }
+
+    /**
+     * Find the smallest child node.
+     * @return The name of the smallest child node.
+     */
+    private String smallestChildName() throws KeeperException, 
InterruptedException {
+        long minId = Long.MAX_VALUE;
+        String minName = "";
+
+        List<String> childNames = null;
+
+        try{
+            childNames = zookeeper.getChildren(dir, false);
+        }catch(KeeperException.NoNodeException e){
+            LOG.warn("Caught: " +e,e);
+            return null;
+        }
+
+        for(String childName : childNames){
+            try{
+                //Check format
+                if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+                    LOG.warn("Found child node with improper name: " + 
childName);
+                    continue;
+                }
+                String suffix = childName.substring(prefix.length());
+                long childId = Long.parseLong(suffix);
+                if(childId < minId){
+                    minId = childId;
+                    minName = childName;
+                }
+            }catch(NumberFormatException e){
+                LOG.warn("Found child node with improper format : " + 
childName + " " + e,e);
+            }
+        }
+
+
+        if(minId < Long.MAX_VALUE){
+            return minName;
+        }else{
+            return null;
+        }
+    }
+
+    /**
+     * Return the head of the queue without modifying the queue.
+     * @return the data at the head of the queue.
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] element() throws NoSuchElementException, KeeperException, 
InterruptedException {
+        TreeMap<Long,String> orderedChildren;
+
+        // element, take, and remove follow the same pattern.
+        // We want to return the child node with the smallest sequence number.
+        // Since other clients are remove()ing and take()ing nodes 
concurrently, 
+        // the child with the smallest sequence number in orderedChildren 
might be gone by the time we check.
+        // We don't call getChildren again until we have tried the rest of the 
nodes in sequence order.
+        while(true){
+            try{
+                orderedChildren = orderedChildren(null);
+            }catch(KeeperException.NoNodeException e){
+                throw new NoSuchElementException();
+            }
+            if(orderedChildren.size() == 0 ) throw new 
NoSuchElementException();
+
+            for(String headNode : orderedChildren.values()){
+                if(headNode != null){
+                    try{
+                        return zookeeper.getData(dir+"/"+headNode, false, 
null);
+                    }catch(KeeperException.NoNodeException e){
+                        //Another client removed the node first, try next
+                    }
+                }
+            }
+
+        }
+    }
+
+
+    /**
+     * Attempts to remove the head of the queue and return it.
+     * @return The former head of the queue
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] remove() throws NoSuchElementException, KeeperException, 
InterruptedException {
+        TreeMap<Long,String> orderedChildren;
+        // Same as for element.  Should refactor this.
+        while(true){
+            try{
+                orderedChildren = orderedChildren(null);
+            }catch(KeeperException.NoNodeException e){
+                throw new NoSuchElementException();
+            }
+            if(orderedChildren.size() == 0) throw new NoSuchElementException();
+
+            for(String headNode : orderedChildren.values()){
+                String path = dir +"/"+headNode;
+                try{
+                    byte[] data = zookeeper.getData(path, false, null);
+                    zookeeper.delete(path, -1);
+                    return data;
+                }catch(KeeperException.NoNodeException e){
+                    // Another client deleted the node first.
+                }
+            }
+
+        }
+    }
+
+    private class LatchChildWatcher implements Watcher {
+
+        CountDownLatch latch;
+
+        public LatchChildWatcher(){
+            latch = new CountDownLatch(1);
+        }
+
+        public void process(WatchedEvent event){
+            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " 
+ 
+                    event.getState() + " type " + event.getType());
+            latch.countDown();
+        }
+        public void await() throws InterruptedException {
+            latch.await();
+        }
+    }
+
+    /**
+     * Removes the head of the queue and returns it, blocks until it succeeds.
+     * @return The former head of the queue
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] take() throws KeeperException, InterruptedException {
+        TreeMap<Long,String> orderedChildren;
+        // Same as for element.  Should refactor this.
+        while(true){
+            LatchChildWatcher childWatcher = new LatchChildWatcher();
+            try{
+                orderedChildren = orderedChildren(childWatcher);
+            }catch(KeeperException.NoNodeException e){
+                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+                continue;
+            }
+            if(orderedChildren.size() == 0){
+                childWatcher.await();
+                continue;
+            }
+
+            for(String headNode : orderedChildren.values()){
+                String path = dir +"/"+headNode;
+                try{
+                    byte[] data = zookeeper.getData(path, false, null);
+                    zookeeper.delete(path, -1);
+                    return data;
+                }catch(KeeperException.NoNodeException e){
+                    // Another client deleted the node first.
+                }
+            }
+        }
+    }
+
+    /**
+     * Inserts data into queue.
+     * @param data
+     * @return true if data was successfully added
+     */
+    public boolean offer(byte[] data) throws KeeperException, 
InterruptedException{
+        for(;;){
+            try{
+                zookeeper.create(dir+"/"+prefix, data, acl, 
CreateMode.PERSISTENT_SEQUENTIAL);
+                return true;
+            }catch(KeeperException.NoNodeException e){
+                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+            }
+        }
+
+    }
+
+    /**
+     * Returns the data at the first element of the queue, or null if the 
queue is empty.
+     * @return data at the first element of the queue, or null.
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] peek() throws KeeperException, InterruptedException{
+        try{
+            return element();
+        }catch(NoSuchElementException e){
+            return null;
+        }
+    }
+
+
+    /**
+     * Attempts to remove the head of the queue and return it. Returns null if 
the queue is empty.
+     * @return Head of the queue or null.
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] poll() throws KeeperException, InterruptedException {
+        try{
+            return remove();
+        }catch(NoSuchElementException e){
+            return null;
+        }
+    }
+
+
+
+}

Added: 
hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java?rev=835560&view=auto
==============================================================================
--- 
hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
 (added)
+++ 
hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
 Thu Nov 12 21:27:47 2009
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.Calendar;
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.recipes.queue.DistributedQueue;
+import org.apache.zookeeper.test.ClientBase;
+
+
+
+public class DistributedQueueTest extends ClientBase {
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        LOG.info("FINISHED " + getName());
+    }
+
+
+
+    public void testOffer1() throws Exception {
+        String dir = "/testOffer1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[0].remove();
+        assertEquals(new String(dequeuedBytes), testString);
+    }
+
+    public void testOffer2() throws Exception {
+        String dir = "/testOffer2";
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[1].remove();
+        assertEquals(new String(dequeuedBytes), testString);
+    }
+
+    public void testTake1() throws Exception {
+        String dir = "/testTake1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[0].take();
+        assertEquals(new String(dequeuedBytes), testString);
+    }
+
+
+
+    public void testRemove1() throws Exception{
+        String dir = "/testRemove1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        try{
+            queueHandles[0].remove();
+        }catch(NoSuchElementException e){
+            return;
+        }
+        assertTrue(false);
+    }
+
+    public void createNremoveMtest(String dir,int n,int m) throws Exception{
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        for(int i=0; i< n; i++){
+            String offerString = testString + i;
+            queueHandles[0].offer(offerString.getBytes());
+        }
+
+        byte data[] = null;
+        for(int i=0; i<m; i++){
+            data=queueHandles[1].remove();
+        }
+        assertEquals(new String(data), testString+(m-1));
+    }
+
+    public void testRemove2() throws Exception{
+        createNremoveMtest("/testRemove2",10,2);
+    }
+    public void testRemove3() throws Exception{
+        createNremoveMtest("/testRemove3",1000,1000);
+    }
+
+    public void createNremoveMelementTest(String dir,int n,int m) throws 
Exception{
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        for(int i=0; i< n; i++){
+            String offerString = testString + i;
+            queueHandles[0].offer(offerString.getBytes());
+        }
+
+        byte data[] = null;
+        for(int i=0; i<m; i++){
+            data=queueHandles[1].remove();
+        }
+        assertEquals(new String(queueHandles[1].element()), testString+m);
+    }
+
+    public void testElement1() throws Exception {
+        createNremoveMelementTest("/testElement1",1,0);
+    }
+
+    public void testElement2() throws Exception {
+        createNremoveMelementTest("/testElement2",10,2);
+    }
+
+    public void testElement3() throws Exception {
+        createNremoveMelementTest("/testElement3",1000,500);
+    }
+
+    public void testElement4() throws Exception {
+        createNremoveMelementTest("/testElement4",1000,1000-1);
+    }
+
+    public void testTakeWait1() throws Exception{
+        String dir = "/testTakeWait1";
+        final String testString = "Hello World";
+        final int num_clients = 1;
+        final ZooKeeper clients[] = new ZooKeeper[num_clients];
+        final DistributedQueue queueHandles[] = new 
DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        final byte[] takeResult[] = new byte[1][];
+        Thread takeThread = new Thread(){
+            public void run(){
+                try{
+                    takeResult[0] = queueHandles[0].take();
+                }catch(KeeperException e){
+
+                }catch(InterruptedException e){
+
+                }
+            }
+        };
+        takeThread.start();
+
+        Thread.sleep(1000);
+        Thread offerThread= new Thread() {
+            public void run(){
+                try {
+                    queueHandles[0].offer(testString.getBytes());
+                } catch (KeeperException e) {
+
+                } catch (InterruptedException e) {
+
+                }
+            }
+        };
+        offerThread.start();
+        offerThread.join();
+
+        takeThread.join();
+
+        assertTrue(takeResult[0] != null);
+        assertEquals(new String(takeResult[0]), testString);
+    }
+
+    public void testTakeWait2() throws Exception{
+        String dir = "/testTakeWait2";
+        final String testString = "Hello World";
+        final int num_clients = 1;
+        final ZooKeeper clients[] = new ZooKeeper[num_clients];
+        final DistributedQueue queueHandles[] = new 
DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+        int num_attempts =2;
+        for(int i=0; i< num_attempts; i++){
+            final byte[] takeResult[] = new byte[1][];
+            final String threadTestString = testString + i;
+            Thread takeThread = new Thread(){
+                public void run(){
+                    try{
+                        takeResult[0] = queueHandles[0].take();
+                    }catch(KeeperException e){
+
+                    }catch(InterruptedException e){
+
+                    }
+                }
+            };
+            takeThread.start();
+
+            Thread.sleep(1000);
+            Thread offerThread= new Thread() {
+                public void run(){
+                    try {
+                        queueHandles[0].offer(threadTestString.getBytes());
+                    } catch (KeeperException e) {
+
+                    } catch (InterruptedException e) {
+
+                    }
+                }
+            };
+            offerThread.start();
+            offerThread.join();
+
+            takeThread.join();
+
+            assertTrue(takeResult[0] != null);
+            assertEquals(new String(takeResult[0]), threadTestString);
+        }
+    }
+}
+


Reply via email to