Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc (original) +++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc Thu Jul 24 14:46:30 2008 @@ -66,17 +66,26 @@ // ***************************************************************************** // watcher action implementation -void activeWatcher(zhandle_t *zh, int type, int state, const char *path){ - if(zh==0 || zoo_get_context(zh)==0) return; - WatcherAction* action=(WatcherAction*)zoo_get_context(zh); - action->setWatcherTriggered(); +void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx){ + if(zh==0 || ctx==0) return; + WatcherAction* action=(WatcherAction*)ctx; - if(type==SESSION_EVENT && state==EXPIRED_SESSION_STATE) - action->onSessionExpired(zh); - if(type==CHANGED_EVENT) + if(type==SESSION_EVENT){ + if(state==EXPIRED_SESSION_STATE) + action->onSessionExpired(zh); + else if(state==CONNECTING_STATE) + action->onConnectionLost(zh); + else if(state==CONNECTED_STATE) + action->onConnectionEstablished(zh); + }else if(type==CHANGED_EVENT) action->onNodeValueChanged(zh,path); + else if(type==DELETED_EVENT) + action->onNodeDeleted(zh,path); + else if(type==CHILD_EVENT) + action->onChildChanged(zh,path); // TODO: implement for the rest of the event types // ... + action->setWatcherTriggered(); } SyncedBoolCondition WatcherAction::isWatcherTriggered() const{ return SyncedBoolCondition(triggered_,mx_); @@ -145,6 +154,167 @@ Mock_get_xid* Mock_get_xid::mock_=0; //****************************************************************************** +// activateWatcher mock + +DECLARE_WRAPPER(void,activateWatcher,(watcher_registration_t* reg, int rc)) +{ + if(!Mock_activateWatcher::mock_){ + CALL_REAL(activateWatcher,(reg,rc)); + }else{ + Mock_activateWatcher::mock_->call(reg,rc); + } +} +Mock_activateWatcher* Mock_activateWatcher::mock_=0; + +class ActivateWatcherWrapper: public Mock_activateWatcher{ +public: + ActivateWatcherWrapper():ctx_(0),activated_(false){} + + virtual void call(watcher_registration_t* reg, int rc){ + CALL_REAL(activateWatcher,(reg,rc)); + synchronized(mx_); + if(reg->context==ctx_){ + activated_=true; + ctx_=0; + } + } + + void setContext(void* ctx){ + synchronized(mx_); + ctx_=ctx; + activated_=false; + } + + SyncedBoolCondition isActivated() const{ + return SyncedBoolCondition(activated_,mx_); + } + mutable Mutex mx_; + void* ctx_; + bool activated_; +}; + +WatcherActivationTracker::WatcherActivationTracker(): + wrapper_(new ActivateWatcherWrapper) +{ +} + +WatcherActivationTracker::~WatcherActivationTracker(){ + delete wrapper_; +} + +void WatcherActivationTracker::track(void* ctx){ + wrapper_->setContext(ctx); +} + +SyncedBoolCondition WatcherActivationTracker::isWatcherActivated() const{ + return wrapper_->isActivated(); +} + +//****************************************************************************** +// +DECLARE_WRAPPER(void,deliverWatchers,(zhandle_t* zh,int type,int state, const char* path)) +{ + if(!Mock_deliverWatchers::mock_){ + CALL_REAL(deliverWatchers,(zh,type,state,path)); + }else{ + Mock_deliverWatchers::mock_->call(zh,type,state,path); + } +} + +Mock_deliverWatchers* Mock_deliverWatchers::mock_=0; + +struct RefCounterValue{ + RefCounterValue(zhandle_t* const& zh,int32_t expectedCounter,Mutex& mx): + zh_(zh),expectedCounter_(expectedCounter),mx_(mx){} + bool operator()() const{ + { + synchronized(mx_); + if(zh_==0) + return false; + } + return inc_ref_counter(zh_,0)==expectedCounter_; + } + zhandle_t* const& zh_; + int32_t expectedCounter_; + Mutex& mx_; +}; + + +class DeliverWatchersWrapper: public Mock_deliverWatchers{ +public: + DeliverWatchersWrapper(int type,int state,bool terminate): + type_(type),state_(state), + allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){} + virtual void call(zhandle_t* zh,int type,int state, const char* path){ + { + synchronized(mx_); + zh_=zh; + allDelivered_=false; + } + CALL_REAL(deliverWatchers,(zh,type,state,path)); + if(type_==type && state_==state){ + if(terminate_){ + // prevent zhandle_t from being prematurely distroyed; + // this will also ensure that zookeeper_close() cleanups the thread + // resources by calling finish_adaptor() + inc_ref_counter(zh,1); + terminateZookeeperThreads(zh); + } + synchronized(mx_); + allDelivered_=true; + deliveryCounter_++; + } + } + SyncedBoolCondition isDelivered() const{ + if(terminate_){ + int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000); + assert(i<1000); + } + return SyncedBoolCondition(allDelivered_,mx_); + } + void resetDeliveryCounter(){ + synchronized(mx_); + deliveryCounter_=0; + } + SyncedIntegerEqual deliveryCounterEquals(int expected) const{ + if(terminate_){ + int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000); + assert(i<1000); + } + return SyncedIntegerEqual(deliveryCounter_,expected,mx_); + } + int type_; + int state_; + mutable Mutex mx_; + bool allDelivered_; + bool terminate_; + zhandle_t* zh_; + int deliveryCounter_; +}; + +WatcherDeliveryTracker::WatcherDeliveryTracker( + int type,int state,bool terminateCompletionThread): + deliveryWrapper_(new DeliverWatchersWrapper( + type,state,terminateCompletionThread)){ +} + +WatcherDeliveryTracker::~WatcherDeliveryTracker(){ + delete deliveryWrapper_; +} + +SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const{ + return deliveryWrapper_->isDelivered(); +} + +void WatcherDeliveryTracker::resetDeliveryCounter(){ + deliveryWrapper_->resetDeliveryCounter(); +} + +SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const{ + return deliveryWrapper_->deliveryCounterEquals(expected); +} + +//****************************************************************************** // string HandshakeResponse::toString() const { string buf; @@ -185,6 +355,45 @@ return res; } +string ZooStatResponse::toString() const{ + oarchive* oa=create_buffer_oarchive(); + + ReplyHeader h = {xid_,1,rc_}; + serialize_ReplyHeader(oa, "hdr", &h); + + SetDataResponse resp; + resp.stat=stat_; + serialize_SetDataResponse(oa, "reply", &resp); + int32_t len=htonl(get_buffer_len(oa)); + string res((char*)&len,sizeof(len)); + res.append(get_buffer(oa),get_buffer_len(oa)); + + close_buffer_oarchive(&oa,1); + return res; +} + +string ZooGetChildrenResponse::toString() const{ + oarchive* oa=create_buffer_oarchive(); + + ReplyHeader h = {xid_,1,rc_}; + serialize_ReplyHeader(oa, "hdr", &h); + + GetChildrenResponse resp; + // populate the string vector + allocate_String_vector(&resp.children,strings_.size()); + for(int i=0;i<(int)strings_.size();++i) + resp.children.data[i]=strdup(strings_[i].c_str()); + serialize_GetChildrenResponse(oa, "reply", &resp); + deallocate_GetChildrenResponse(&resp); + + int32_t len=htonl(get_buffer_len(oa)); + string res((char*)&len,sizeof(len)); + res.append(get_buffer(oa),get_buffer_len(oa)); + + close_buffer_oarchive(&oa,1); + return res; +} + string ZNodeEvent::toString() const{ oarchive* oa=create_buffer_oarchive(); struct WatcherEvent evt = {type_,0,(char*)path_.c_str()}; @@ -219,7 +428,7 @@ // Zookeeper server simulator // bool ZookeeperServer::hasMoreRecv() const{ - return recvHasMore.get()!=0; + return recvHasMore.get()!=0 || connectionLost; } ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){ @@ -295,3 +504,8 @@ gettimeofday(&zh->last_recv,0); gettimeofday(&zh->last_send,0); } + +void terminateZookeeperThreads(zhandle_t* zh){ + // this will cause the zookeeper threads to terminate + zh->close_requested=1; +}
Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h (original) +++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h Thu Jul 24 14:46:30 2008 @@ -32,6 +32,11 @@ // Async API tests! void forceConnected(zhandle_t* zh); +/** + * Gracefully terminates zookeeper I/O and completion threads. + */ +void terminateZookeeperThreads(zhandle_t* zh); + // ***************************************************************************** // Abstract watcher action struct SyncedBoolCondition; @@ -42,7 +47,11 @@ virtual ~WatcherAction(){} virtual void onSessionExpired(zhandle_t*){} + virtual void onConnectionEstablished(zhandle_t*){} + virtual void onConnectionLost(zhandle_t*){} virtual void onNodeValueChanged(zhandle_t*,const char* path){} + virtual void onNodeDeleted(zhandle_t*,const char* path){} + virtual void onChildChanged(zhandle_t*,const char* path){} SyncedBoolCondition isWatcherTriggered() const; void setWatcherTriggered(){ @@ -57,7 +66,7 @@ // zh->context is a pointer to a WatcherAction instance // based on the event type and state, the watcher calls a specific watcher // action method -void activeWatcher(zhandle_t *zh, int type, int state, const char *path); +void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx); // ***************************************************************************** // a set of async completion signatures @@ -104,6 +113,7 @@ zhandle_t* zh_; }; +// a synchronized boolean condition struct SyncedBoolCondition{ SyncedBoolCondition(const bool& cond,Mutex& mx):cond_(cond),mx_(mx){} bool operator()() const{ @@ -113,6 +123,20 @@ const bool& cond_; Mutex& mx_; }; + +// a synchronized integer comparison +struct SyncedIntegerEqual{ + SyncedIntegerEqual(const int& cond,int expected,Mutex& mx): + cond_(cond),expected_(expected),mx_(mx){} + bool operator()() const{ + synchronized(mx_); + return cond_==expected_; + } + const int& cond_; + const int expected_; + Mutex& mx_; +}; + // ***************************************************************************** // make sure to call zookeeper_close() even in presence of exceptions struct CloseFinally{ @@ -158,7 +182,6 @@ // ***************************************************************************** // flush_send_queue - class Mock_flush_send_queue: public Mock { public: @@ -177,7 +200,6 @@ // ***************************************************************************** // get_xid - class Mock_get_xid: public Mock { public: @@ -194,6 +216,57 @@ }; // ***************************************************************************** +// activateWatcher +class Mock_activateWatcher: public Mock{ +public: + Mock_activateWatcher(){mock_=this;} + virtual ~Mock_activateWatcher(){mock_=0;} + + virtual void call(watcher_registration_t* reg, int rc){} + static Mock_activateWatcher* mock_; +}; + +class ActivateWatcherWrapper; +class WatcherActivationTracker{ +public: + WatcherActivationTracker(); + ~WatcherActivationTracker(); + + void track(void* ctx); + SyncedBoolCondition isWatcherActivated() const; +private: + ActivateWatcherWrapper* wrapper_; +}; + +// ***************************************************************************** +// deliverWatchers +class Mock_deliverWatchers: public Mock{ +public: + Mock_deliverWatchers(){mock_=this;} + virtual ~Mock_deliverWatchers(){mock_=0;} + + virtual void call(zhandle_t* zh,int type,int state, const char* path){} + static Mock_deliverWatchers* mock_; +}; + +class DeliverWatchersWrapper; +class WatcherDeliveryTracker{ +public: + // filters deliveries by state and type + WatcherDeliveryTracker(int type,int state,bool terminateCompletionThread=true); + ~WatcherDeliveryTracker(); + + // if the thread termination requested (see the ctor params) + // this function will wait for the I/O and completion threads to + // terminate before returning a SyncBoolCondition instance + SyncedBoolCondition isWatcherProcessingCompleted() const; + void resetDeliveryCounter(); + SyncedIntegerEqual deliveryCounterEquals(int expected) const; +private: + DeliverWatchersWrapper* deliveryWrapper_; +}; + +// ***************************************************************************** // a zookeeper Stat wrapper struct NodeStat: public Stat { @@ -220,6 +293,8 @@ virtual ~Response(){} virtual void setXID(int32_t xid){} + // this method is used by the ZookeeperServer class to serialize + // the instance of Response virtual std::string toString() const =0; }; @@ -259,6 +334,41 @@ Stat stat_; }; +// zoo_exists(), zoo_set() response +class ZooStatResponse: public Response +{ +public: + ZooStatResponse(int32_t xid=0,int rc=ZOK,const Stat& stat=NodeStat()) + :xid_(xid),rc_(rc),stat_(stat) + { + } + virtual std::string toString() const; + virtual void setXID(int32_t xid) {xid_=xid;} + +private: + int32_t xid_; + int rc_; + Stat stat_; +}; + +// zoo_get_children() +class ZooGetChildrenResponse: public Response +{ +public: + typedef std::vector<std::string> StringVector; + ZooGetChildrenResponse(const StringVector& v,int rc=ZOK): + xid_(0),strings_(v),rc_(rc) + { + } + + virtual std::string toString() const; + virtual void setXID(int32_t xid) {xid_=xid;} + + int32_t xid_; + StringVector strings_; + int rc_; +}; + // PING response class PingResponse: public Response { @@ -326,12 +436,12 @@ // this is a trigger that gets reset back to false // a connect request will return a non-matching session id thus causing // the client throw SESSION_EXPIRED - bool sessionExpired; + volatile bool sessionExpired; void returnSessionExpired(){ sessionExpired=true; } - // this is a trigger that gets reset back to false + // this is a one shot trigger that gets reset back to false // next recv call will return 0 length, thus simulating a connecton loss - bool connectionLost; + volatile bool connectionLost; void setConnectionLost() {connectionLost=true;} // recv Modified: hadoop/zookeeper/trunk/src/c/tests/wrappers.opt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/wrappers.opt?rev=679557&r1=679556&r2=679557&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/c/tests/wrappers.opt (original) +++ hadoop/zookeeper/trunk/src/c/tests/wrappers.opt Thu Jul 24 14:46:30 2008 @@ -2,3 +2,5 @@ -Wl,--wrap -Wl,free -Wl,--wrap -Wl,flush_send_queue -Wl,--wrap -Wl,get_xid +-Wl,--wrap -Wl,deliverWatchers +-Wl,--wrap -Wl,activateWatcher