Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc Thu Jul 24 14:46:30 2008
@@ -66,17 +66,26 @@
 
 // 
*****************************************************************************
 // watcher action implementation
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path){
-    if(zh==0 || zoo_get_context(zh)==0) return;
-    WatcherAction* action=(WatcherAction*)zoo_get_context(zh);
-    action->setWatcherTriggered();    
+void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* 
ctx){
+    if(zh==0 || ctx==0) return;
+    WatcherAction* action=(WatcherAction*)ctx;
     
-    if(type==SESSION_EVENT && state==EXPIRED_SESSION_STATE)
-        action->onSessionExpired(zh);
-    if(type==CHANGED_EVENT)
+    if(type==SESSION_EVENT){
+        if(state==EXPIRED_SESSION_STATE)
+            action->onSessionExpired(zh);
+        else if(state==CONNECTING_STATE)
+            action->onConnectionLost(zh);
+        else if(state==CONNECTED_STATE)
+            action->onConnectionEstablished(zh);
+    }else if(type==CHANGED_EVENT)
         action->onNodeValueChanged(zh,path);
+    else if(type==DELETED_EVENT)
+        action->onNodeDeleted(zh,path);
+    else if(type==CHILD_EVENT)
+        action->onChildChanged(zh,path);
     // TODO: implement for the rest of the event types
     // ...
+    action->setWatcherTriggered();    
 }
 SyncedBoolCondition WatcherAction::isWatcherTriggered() const{
     return SyncedBoolCondition(triggered_,mx_);
@@ -145,6 +154,167 @@
 Mock_get_xid* Mock_get_xid::mock_=0;
 
 
//******************************************************************************
+// activateWatcher mock
+
+DECLARE_WRAPPER(void,activateWatcher,(watcher_registration_t* reg, int rc))
+{
+    if(!Mock_activateWatcher::mock_){
+        CALL_REAL(activateWatcher,(reg,rc));
+    }else{
+        Mock_activateWatcher::mock_->call(reg,rc);
+    }
+}
+Mock_activateWatcher* Mock_activateWatcher::mock_=0;
+
+class ActivateWatcherWrapper: public Mock_activateWatcher{
+public:
+    ActivateWatcherWrapper():ctx_(0),activated_(false){}
+    
+    virtual void call(watcher_registration_t* reg, int rc){
+        CALL_REAL(activateWatcher,(reg,rc));
+        synchronized(mx_);
+        if(reg->context==ctx_){
+            activated_=true;
+            ctx_=0;
+        }
+    }
+    
+    void setContext(void* ctx){
+        synchronized(mx_);
+        ctx_=ctx;
+        activated_=false;
+    }
+    
+    SyncedBoolCondition isActivated() const{
+        return SyncedBoolCondition(activated_,mx_);
+    }
+    mutable Mutex mx_;
+    void* ctx_;
+    bool activated_;
+};
+
+WatcherActivationTracker::WatcherActivationTracker():
+    wrapper_(new ActivateWatcherWrapper)
+{    
+}
+
+WatcherActivationTracker::~WatcherActivationTracker(){
+    delete wrapper_;
+}
+
+void WatcherActivationTracker::track(void* ctx){
+    wrapper_->setContext(ctx);
+}
+
+SyncedBoolCondition WatcherActivationTracker::isWatcherActivated() const{
+    return wrapper_->isActivated();
+}
+
+//******************************************************************************
+//
+DECLARE_WRAPPER(void,deliverWatchers,(zhandle_t* zh,int type,int state, const 
char* path))
+{
+    if(!Mock_deliverWatchers::mock_){
+        CALL_REAL(deliverWatchers,(zh,type,state,path));
+    }else{
+        Mock_deliverWatchers::mock_->call(zh,type,state,path);
+    }
+}
+
+Mock_deliverWatchers* Mock_deliverWatchers::mock_=0;
+
+struct RefCounterValue{
+    RefCounterValue(zhandle_t* const& zh,int32_t expectedCounter,Mutex& mx):
+        zh_(zh),expectedCounter_(expectedCounter),mx_(mx){}
+    bool operator()() const{
+        {
+            synchronized(mx_);
+            if(zh_==0)
+                return false;
+        }
+        return inc_ref_counter(zh_,0)==expectedCounter_;
+    }
+    zhandle_t* const& zh_;
+    int32_t expectedCounter_;
+    Mutex& mx_;
+};
+
+
+class DeliverWatchersWrapper: public Mock_deliverWatchers{
+public:
+    DeliverWatchersWrapper(int type,int state,bool terminate):
+        type_(type),state_(state),
+        allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){}
+    virtual void call(zhandle_t* zh,int type,int state, const char* path){
+        {
+            synchronized(mx_);
+            zh_=zh;
+            allDelivered_=false;
+        }
+        CALL_REAL(deliverWatchers,(zh,type,state,path));
+        if(type_==type && state_==state){
+            if(terminate_){
+                // prevent zhandle_t from being prematurely distroyed;
+                // this will also ensure that zookeeper_close() cleanups the 
thread
+                // resources by calling finish_adaptor()
+                inc_ref_counter(zh,1);
+                terminateZookeeperThreads(zh);
+            }
+            synchronized(mx_);
+            allDelivered_=true;
+            deliveryCounter_++;
+        }
+    }
+    SyncedBoolCondition isDelivered() const{
+        if(terminate_){
+            int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
+            assert(i<1000);
+        }
+        return SyncedBoolCondition(allDelivered_,mx_);
+    }
+    void resetDeliveryCounter(){
+        synchronized(mx_);
+        deliveryCounter_=0;
+    }
+    SyncedIntegerEqual deliveryCounterEquals(int expected) const{
+        if(terminate_){
+            int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
+            assert(i<1000);
+        }
+        return SyncedIntegerEqual(deliveryCounter_,expected,mx_);
+    }
+    int type_;
+    int state_;
+    mutable Mutex mx_;
+    bool allDelivered_;
+    bool terminate_;
+    zhandle_t* zh_;
+    int deliveryCounter_;
+};
+
+WatcherDeliveryTracker::WatcherDeliveryTracker(
+        int type,int state,bool terminateCompletionThread):
+    deliveryWrapper_(new DeliverWatchersWrapper(
+            type,state,terminateCompletionThread)){
+}
+
+WatcherDeliveryTracker::~WatcherDeliveryTracker(){
+    delete deliveryWrapper_;
+}
+
+SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() 
const{
+    return deliveryWrapper_->isDelivered();
+}
+
+void WatcherDeliveryTracker::resetDeliveryCounter(){
+    deliveryWrapper_->resetDeliveryCounter();
+}
+
+SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) 
const{
+    return deliveryWrapper_->deliveryCounterEquals(expected);
+}
+
+//******************************************************************************
 //
 string HandshakeResponse::toString() const {
     string buf;
@@ -185,6 +355,45 @@
     return res;
 }
 
+string ZooStatResponse::toString() const{
+    oarchive* oa=create_buffer_oarchive();
+    
+    ReplyHeader h = {xid_,1,rc_};
+    serialize_ReplyHeader(oa, "hdr", &h);
+    
+    SetDataResponse resp;
+    resp.stat=stat_;
+    serialize_SetDataResponse(oa, "reply", &resp);
+    int32_t len=htonl(get_buffer_len(oa));
+    string res((char*)&len,sizeof(len));
+    res.append(get_buffer(oa),get_buffer_len(oa));
+    
+    close_buffer_oarchive(&oa,1);
+    return res;
+}
+
+string ZooGetChildrenResponse::toString() const{
+    oarchive* oa=create_buffer_oarchive();
+    
+    ReplyHeader h = {xid_,1,rc_};
+    serialize_ReplyHeader(oa, "hdr", &h);
+ 
+    GetChildrenResponse resp;
+    // populate the string vector
+    allocate_String_vector(&resp.children,strings_.size());
+    for(int i=0;i<(int)strings_.size();++i)
+        resp.children.data[i]=strdup(strings_[i].c_str());
+    serialize_GetChildrenResponse(oa, "reply", &resp);
+    deallocate_GetChildrenResponse(&resp);
+    
+    int32_t len=htonl(get_buffer_len(oa));
+    string res((char*)&len,sizeof(len));
+    res.append(get_buffer(oa),get_buffer_len(oa));
+    
+    close_buffer_oarchive(&oa,1);
+    return res;
+}
+
 string ZNodeEvent::toString() const{
     oarchive* oa=create_buffer_oarchive();
     struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
@@ -219,7 +428,7 @@
 // Zookeeper server simulator
 // 
 bool ZookeeperServer::hasMoreRecv() const{
-  return recvHasMore.get()!=0;
+  return recvHasMore.get()!=0  || connectionLost;
 }
 
 ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){
@@ -295,3 +504,8 @@
     gettimeofday(&zh->last_recv,0);    
     gettimeofday(&zh->last_send,0);    
 }
+
+void terminateZookeeperThreads(zhandle_t* zh){
+    // this will cause the zookeeper threads to terminate
+    zh->close_requested=1;
+}

Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h (original)
+++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h Thu Jul 24 14:46:30 2008
@@ -32,6 +32,11 @@
 // Async API tests!
 void forceConnected(zhandle_t* zh); 
 
+/**
+ * Gracefully terminates zookeeper I/O and completion threads. 
+ */
+void terminateZookeeperThreads(zhandle_t* zh);
+
 // 
*****************************************************************************
 // Abstract watcher action
 struct SyncedBoolCondition;
@@ -42,7 +47,11 @@
     virtual ~WatcherAction(){}
     
     virtual void onSessionExpired(zhandle_t*){}
+    virtual void onConnectionEstablished(zhandle_t*){}
+    virtual void onConnectionLost(zhandle_t*){}
     virtual void onNodeValueChanged(zhandle_t*,const char* path){}
+    virtual void onNodeDeleted(zhandle_t*,const char* path){}
+    virtual void onChildChanged(zhandle_t*,const char* path){}
     
     SyncedBoolCondition isWatcherTriggered() const;
     void setWatcherTriggered(){
@@ -57,7 +66,7 @@
 // zh->context is a pointer to a WatcherAction instance
 // based on the event type and state, the watcher calls a specific watcher 
 // action method
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path);
+void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* 
ctx);
 
 // 
*****************************************************************************
 // a set of async completion signatures
@@ -104,6 +113,7 @@
     zhandle_t* zh_;
 };
 
+// a synchronized boolean condition
 struct SyncedBoolCondition{
     SyncedBoolCondition(const bool& cond,Mutex& mx):cond_(cond),mx_(mx){}
     bool operator()() const{
@@ -113,6 +123,20 @@
     const bool& cond_;
     Mutex& mx_;
 };
+
+// a synchronized integer comparison
+struct SyncedIntegerEqual{
+    SyncedIntegerEqual(const int& cond,int expected,Mutex& mx):
+        cond_(cond),expected_(expected),mx_(mx){}
+    bool operator()() const{
+        synchronized(mx_);
+        return cond_==expected_;
+    }
+    const int& cond_;
+    const int expected_;
+    Mutex& mx_;
+};
+
 // 
*****************************************************************************
 // make sure to call zookeeper_close() even in presence of exceptions 
 struct CloseFinally{
@@ -158,7 +182,6 @@
 
 // 
*****************************************************************************
 // flush_send_queue
-
 class Mock_flush_send_queue: public Mock
 {
 public:
@@ -177,7 +200,6 @@
 
 // 
*****************************************************************************
 // get_xid
-
 class Mock_get_xid: public Mock
 {
 public:
@@ -194,6 +216,57 @@
 };
 
 // 
*****************************************************************************
+// activateWatcher
+class Mock_activateWatcher: public Mock{
+public:
+    Mock_activateWatcher(){mock_=this;}
+    virtual ~Mock_activateWatcher(){mock_=0;}
+    
+    virtual void call(watcher_registration_t* reg, int rc){}
+    static Mock_activateWatcher* mock_;
+};
+
+class ActivateWatcherWrapper;
+class WatcherActivationTracker{
+public:
+    WatcherActivationTracker();
+    ~WatcherActivationTracker();
+    
+    void track(void* ctx);
+    SyncedBoolCondition isWatcherActivated() const;
+private:
+    ActivateWatcherWrapper* wrapper_;
+};
+
+// 
*****************************************************************************
+// deliverWatchers
+class Mock_deliverWatchers: public Mock{
+public:
+    Mock_deliverWatchers(){mock_=this;}
+    virtual ~Mock_deliverWatchers(){mock_=0;}
+    
+    virtual void call(zhandle_t* zh,int type,int state, const char* path){}
+    static Mock_deliverWatchers* mock_;
+};
+
+class DeliverWatchersWrapper;
+class WatcherDeliveryTracker{
+public:
+    // filters deliveries by state and type
+    WatcherDeliveryTracker(int type,int state,bool 
terminateCompletionThread=true);
+    ~WatcherDeliveryTracker();
+    
+    // if the thread termination requested (see the ctor params)
+    // this function will wait for the I/O and completion threads to 
+    // terminate before returning a SyncBoolCondition instance
+    SyncedBoolCondition isWatcherProcessingCompleted() const;
+    void resetDeliveryCounter();
+    SyncedIntegerEqual deliveryCounterEquals(int expected) const;
+private:
+    DeliverWatchersWrapper* deliveryWrapper_;
+};
+
+// 
*****************************************************************************
 // a zookeeper Stat wrapper
 struct NodeStat: public Stat
 {
@@ -220,6 +293,8 @@
     virtual ~Response(){}
     
     virtual void setXID(int32_t xid){}
+    // this method is used by the ZookeeperServer class to serialize 
+    // the instance of Response
     virtual std::string toString() const =0;
 };
 
@@ -259,6 +334,41 @@
     Stat stat_;
 };
 
+// zoo_exists(), zoo_set() response
+class ZooStatResponse: public Response
+{
+public:
+    ZooStatResponse(int32_t xid=0,int rc=ZOK,const Stat& stat=NodeStat())
+        :xid_(xid),rc_(rc),stat_(stat)
+    {
+    }
+    virtual std::string toString() const;
+    virtual void setXID(int32_t xid) {xid_=xid;}
+    
+private:
+    int32_t xid_;
+    int rc_;
+    Stat stat_;
+};
+
+// zoo_get_children()
+class ZooGetChildrenResponse: public Response
+{
+public:
+    typedef std::vector<std::string> StringVector;
+    ZooGetChildrenResponse(const StringVector& v,int rc=ZOK):
+        xid_(0),strings_(v),rc_(rc)
+    {
+    }
+    
+    virtual std::string toString() const;
+    virtual void setXID(int32_t xid) {xid_=xid;}
+
+    int32_t xid_;
+    StringVector strings_;
+    int rc_;
+};
+
 // PING response
 class PingResponse: public Response
 {
@@ -326,12 +436,12 @@
     // this is a trigger that gets reset back to false
     // a connect request will return a non-matching session id thus causing 
     // the client throw SESSION_EXPIRED
-    bool sessionExpired;
+    volatile bool sessionExpired;
     void returnSessionExpired(){ sessionExpired=true; }
     
-    // this is a trigger that gets reset back to false
+    // this is a one shot trigger that gets reset back to false
     // next recv call will return 0 length, thus simulating a connecton loss
-    bool connectionLost;
+    volatile bool connectionLost;
     void setConnectionLost() {connectionLost=true;}
     
     // recv

Modified: hadoop/zookeeper/trunk/src/c/tests/wrappers.opt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/wrappers.opt?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/wrappers.opt (original)
+++ hadoop/zookeeper/trunk/src/c/tests/wrappers.opt Thu Jul 24 14:46:30 2008
@@ -2,3 +2,5 @@
 -Wl,--wrap -Wl,free
 -Wl,--wrap -Wl,flush_send_queue
 -Wl,--wrap -Wl,get_xid
+-Wl,--wrap -Wl,deliverWatchers
+-Wl,--wrap -Wl,activateWatcher


Reply via email to