Revision: 6946 http://playerstage.svn.sourceforge.net/playerstage/?rev=6946&view=rev Author: thjc Date: 2008-08-09 02:30:28 +0000 (Sat, 09 Aug 2008)
Log Message: ----------- Fixed some obscure timing issues in libplayertcp/remote driver/filewatcher code which only surfaced with specific use of the passthrough driver. Also loop iteration in wait call of filewatcher. Modified Paths: -------------- code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc code/player/branches/release-2-1-patches/libplayertcp/playertcp.h code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h code/player/branches/release-2-1-patches/server/server.cc Modified: code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc =================================================================== --- code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc 2008-08-09 02:30:28 UTC (rev 6946) @@ -45,6 +45,7 @@ Lock(); if (WatchedFilesArrayCount == 0) { + PLAYER_ERROR("File watcher wait called with no files to watch"); Unlock(); return 0; } @@ -75,20 +76,25 @@ struct timeval t; t.tv_sec = static_cast<int> (floor(Timeout)); t.tv_usec = static_cast<int> ((Timeout - static_cast<int> (floor(Timeout))) * 1e6); - Unlock(); int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t); if (ret < 0) { PLAYER_ERROR2("Select called failed in File Watcher: %d %s",errno,strerror(errno)); + Unlock(); return ret; } + else if (ret == 0) + { + Unlock(); + return 0; + } int queueless_count = 0; + int match_count = 0; - Lock(); - for (unsigned int ii = 0; ii < WatchedFilesArrayCount && static_cast<int> (ii) < maxfd; ++ii) + for (unsigned int ii = 0; ii < WatchedFilesArrayCount && ret > match_count; ++ii) { int fd = WatchedFiles[ii].fd; QueuePointer &q = WatchedFiles[ii].queue; @@ -98,18 +104,26 @@ (WatchedFiles[ii].Write && FD_ISSET(fd,&WriteFds)) || (WatchedFiles[ii].Except && FD_ISSET(fd,&ExceptFds))) { + match_count++; if (q != NULL) { q->DataAvailable(); } else + { queueless_count++; + } } } } Unlock(); + if (ret != match_count) + { + PLAYER_ERROR1("Failed to match %d file descriptors in select results",ret - match_count); + } + return queueless_count; } @@ -125,32 +139,27 @@ Lock(); // find the first available file descriptor struct fd_driver_pair *next_entry = NULL; - if (WatchedFilesArrayCount < WatchedFilesArraySize) + // first see if there is an empty spot in the list + for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii) { - next_entry = &WatchedFiles[WatchedFilesArrayCount]; - WatchedFilesArrayCount++; + if (WatchedFiles[ii].fd < 0) + { + next_entry = &WatchedFiles[ii]; + break; + } } - else + if (next_entry == NULL) { - // first see if there is an empty spot in the list - for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii) + if (WatchedFilesArrayCount >= WatchedFilesArraySize) { - if (WatchedFiles[ii].fd < 0) - { - next_entry = &WatchedFiles[ii]; - break; - } - } - if (next_entry == NULL) - { // otherwise we allocate some more room for the array size_t orig_size = WatchedFilesArraySize; WatchedFilesArraySize*=2; WatchedFiles = reinterpret_cast<struct fd_driver_pair *> (realloc(WatchedFiles,sizeof(WatchedFiles[0])*WatchedFilesArraySize)); memset(&WatchedFiles[orig_size],0,sizeof(WatchedFiles[0])*(WatchedFilesArraySize-orig_size)); - next_entry = &WatchedFiles[WatchedFilesArrayCount]; - } + next_entry = &WatchedFiles[WatchedFilesArrayCount]; + WatchedFilesArrayCount++; } next_entry->fd = fd; @@ -158,6 +167,7 @@ next_entry->Read = WatchRead; next_entry->Write = WatchWrite; next_entry->Except = WatchExcept; + Unlock(); return 0; } Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc =================================================================== --- code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc 2008-08-09 02:30:28 UTC (rev 6946) @@ -214,7 +214,9 @@ bool have_lock) { if(!have_lock) + { Lock(); + } unsigned char data[PLAYER_IDENT_STRLEN]; @@ -254,7 +256,6 @@ // set up for later use by global file watcher fileWatcher->AddFileWatch(this->client_ufds[j].fd); - // Create an outgoing queue for this client this->clients[j].queue = QueuePointer(true,PLAYER_MSGQUEUE_DEFAULT_MAXLEN); @@ -329,7 +330,9 @@ } if(!num_accepts) + { return(0); + } for(int i=0; (i<num_listeners) && (num_accepts>0); i++) { @@ -395,9 +398,9 @@ } } free(this->clients[cli].dev_subs); + fileWatcher->RemoveFileWatch(this->clients[cli].fd); if(close(this->clients[cli].fd) < 0) PLAYER_WARN1("close() failed: %s", strerror(errno)); - fileWatcher->RemoveFileWatch(this->clients[cli].fd); this->clients[cli].fd = -1; this->clients[cli].valid = 0; Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.h =================================================================== --- code/player/branches/release-2-1-patches/libplayertcp/playertcp.h 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.h 2008-08-09 02:30:28 UTC (rev 6946) @@ -136,13 +136,13 @@ /** Total size of @p decode_readbuffer */ int decode_readbuffersize; + void Lock(); + void Unlock(); public: PlayerTCP(); ~PlayerTCP(); - void Lock(); - void Unlock(); static void InitGlobals(void); @@ -150,7 +150,7 @@ int Listen(int* ports, int num_ports, int* new_ports=NULL); int Listen(int port); - QueuePointer AddClient(struct sockaddr_in* cliaddr, + QueuePointer AddClient(struct sockaddr_in* cliaddr, unsigned int local_host, unsigned int local_port, int newsock, Modified: code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc =================================================================== --- code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc 2008-08-09 02:30:28 UTC (rev 6946) @@ -2,7 +2,6 @@ * Player - One Hell of a Robot Server * Copyright (C) <insert dates here> * <insert author's name(s) here> - * * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -55,24 +54,20 @@ #include <libplayerxdr/playerxdr.h> #include "remote_driver.h" -TCPRemoteDriver::TCPRemoteDriver(player_devaddr_t addr, void* arg) - : Driver(NULL, 0, false, PLAYER_MSGQUEUE_DEFAULT_MAXLEN) +TCPRemoteDriver::TCPRemoteDriver(player_devaddr_t addr, void* arg) + : Driver(NULL, 0, false, PLAYER_MSGQUEUE_DEFAULT_MAXLEN), + sock(-1), + setup_timeout (DEFAULT_SETUP_TIMEOUT) { - if(arg) - this->ptcp = (PlayerTCP*)arg; - else - this->ptcp = NULL; - - this->sock = -1; - this->setup_timeout = DEFAULT_SETUP_TIMEOUT; + ptcp = reinterpret_cast<PlayerTCP*> (arg); } TCPRemoteDriver::~TCPRemoteDriver() { } -int -TCPRemoteDriver::Setup() +int +TCPRemoteDriver::Setup() { struct sockaddr_in server; char banner[PLAYER_IDENT_STRLEN]; @@ -94,7 +89,7 @@ } - // Construct socket + // Construct socket this->sock = socket(PF_INET, SOCK_STREAM, 0); if(this->sock < 0) { @@ -106,17 +101,17 @@ server.sin_addr.s_addr = this->device_addr.host; server.sin_port = htons(this->device_addr.robot); - // Connect the socket + // Connect the socket if(connect(this->sock, (struct sockaddr*)&server, sizeof(server)) < 0) { PLAYER_ERROR3("connect call on [%s:%u] failed with error [%s]", this->ipaddr, - this->device_addr.robot, + this->device_addr.robot, strerror(errno)); return(-1); } - printf("connected to: %s:%u\n", + PLAYER_MSG2(2,"connected to: %s:%u\n", this->ipaddr, this->device_addr.robot); // make the socket non-blocking @@ -143,7 +138,7 @@ numread=0; while(numread < (int)sizeof(banner)) { - if((thisnumread = read(this->sock, banner+numread, + if((thisnumread = read(this->sock, banner+numread, sizeof(banner)-numread)) < 0) { if(errno != EAGAIN) @@ -171,13 +166,14 @@ // Add this socket for monitoring this->kill_flag = 0; - this->queue = this->ptcp->AddClient(NULL, - this->device_addr.host, - this->device_addr.robot, - this->sock, + this->queue = this->ptcp->AddClient(NULL, + this->device_addr.host, + this->device_addr.robot, + this->sock, false, &this->kill_flag, (this->ptcp->thread == pthread_self())); + PLAYER_MSG0(5,"Adding new TCPRemoteDriver to the PlayerTCP Client List...Success"); return(0); @@ -206,7 +202,7 @@ req.driver_name_count = 0; // Encode the body - if((encode_msglen = + if((encode_msglen = player_device_req_pack(buf + PLAYERXDR_MSGHDR_SIZE, sizeof(buf)-PLAYERXDR_MSGHDR_SIZE, &req, PLAYERXDR_ENCODE)) < 0) @@ -233,7 +229,7 @@ while(numbytes < encode_msglen) { - if((thisnumbytes = write(this->sock, buf+numbytes, + if((thisnumbytes = write(this->sock, buf+numbytes, encode_msglen-numbytes)) < 0) { if(errno != EAGAIN) @@ -258,13 +254,13 @@ // In any case, explicitly unsubscribing is just a courtesy. if(mode == PLAYER_CLOSE_MODE) return(0); - + // Receive the response header GlobalTime->GetTimeDouble(&t1); numbytes = 0; while(numbytes < PLAYERXDR_MSGHDR_SIZE) { - if((thisnumbytes = read(this->sock, buf+numbytes, + if((thisnumbytes = read(this->sock, buf+numbytes, PLAYERXDR_MSGHDR_SIZE-numbytes)) < 0) { if(errno != EAGAIN) @@ -291,7 +287,7 @@ } // Is it the right kind of message? - if(!Message::MatchMessage(&hdr, + if(!Message::MatchMessage(&hdr, PLAYER_MSGTYPE_RESP_ACK, PLAYER_PLAYER_REQ_DEV, hdr.addr)) @@ -305,7 +301,7 @@ numbytes = 0; while(numbytes < (int)hdr.size) { - if((thisnumbytes = read(this->sock, buf+PLAYERXDR_MSGHDR_SIZE+numbytes, + if((thisnumbytes = read(this->sock, buf+PLAYERXDR_MSGHDR_SIZE+numbytes, hdr.size-numbytes)) < 0) { if(errno != EAGAIN) @@ -351,8 +347,8 @@ return(0); } -int -TCPRemoteDriver::Shutdown() +int +TCPRemoteDriver::Shutdown() { // Have we already been killed? if(!this->kill_flag) @@ -366,27 +362,12 @@ this->ptcp->DeleteClient(this->queue, (this->ptcp->thread == pthread_self())); } - return(0); + return(0); } -void -TCPRemoteDriver::Update() -{ - if(this->ptcp->thread == pthread_self()) - { - //this->ptcp->Read(0,true); - this->ptcp->Lock(); - this->ptcp->ReadClient(this->queue); - this->ptcp->Unlock(); - } - this->ProcessMessages(); - if(this->ptcp->thread == pthread_self()) - this->ptcp->Write(false); -} - -int -TCPRemoteDriver::ProcessMessage(QueuePointer &resp_queue, - player_msghdr * hdr, +int +TCPRemoteDriver::ProcessMessage(QueuePointer &resp_queue, + player_msghdr * hdr, void * data) { // Is it data from the remote device? @@ -444,7 +425,7 @@ } } // Forward response (success or failure) from the laser - else if((Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_ACK, + else if((Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_ACK, -1, this->device_addr)) || (Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_NACK, -1, this->device_addr))) @@ -461,7 +442,7 @@ } -Driver* +Driver* TCPRemoteDriver::TCPRemoteDriver_Init(player_devaddr_t addr, void* arg) { return((Driver*)(new TCPRemoteDriver(addr, arg))); Modified: code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h =================================================================== --- code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h 2008-08-09 02:30:28 UTC (rev 6946) @@ -2,8 +2,8 @@ * Player - One Hell of a Robot Server * Copyright (C) <insert dates here> * <insert author's name(s) here> - * * + * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or @@ -63,9 +63,8 @@ virtual int Setup(); virtual int Shutdown(); - virtual void Update(); - virtual int ProcessMessage(QueuePointer & resp_queue, - player_msghdr * hdr, + virtual int ProcessMessage(QueuePointer & resp_queue, + player_msghdr * hdr, void * data); static Driver* TCPRemoteDriver_Init(player_devaddr_t addr, void* arg); Modified: code/player/branches/release-2-1-patches/server/server.cc =================================================================== --- code/player/branches/release-2-1-patches/server/server.cc 2008-08-07 13:48:09 UTC (rev 6945) +++ code/player/branches/release-2-1-patches/server/server.cc 2008-08-09 02:30:28 UTC (rev 6946) @@ -295,26 +295,27 @@ while(!player_quit) { // wait until something other than driver requested watches happens - fileWatcher->Wait(); - - if(ptcp->Accept(0) < 0) + int numready = fileWatcher->Wait(0.01); // run at a minimum of 100Hz for other drivers + if (numready > 0) { - PLAYER_ERROR("failed while accepting new TCP connections"); - break; - } + if(ptcp->Accept(0) < 0) + { + PLAYER_ERROR("failed while accepting new TCP connections"); + break; + } - if(ptcp->Read(100,false) < 0) - { - PLAYER_ERROR("failed while reading from TCP clients"); - break; - } + if(ptcp->Read(0,false) < 0) + { + PLAYER_ERROR("failed while reading from TCP clients"); + break; + } - if(pudp->Read(0) < 0) - { - PLAYER_ERROR("failed while reading from UDP clients"); - break; + if(pudp->Read(0) < 0) + { + PLAYER_ERROR("failed while reading from UDP clients"); + break; + } } - deviceTable->UpdateDevices(); if(ptcp->Write(false) < 0) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------- This SF.Net email is sponsored by the Moblin Your Move Developer's challenge Build the coolest Linux based applications with Moblin SDK & win great prizes Grand prize is a trip for two to an Open Source event anywhere in the world http://moblin-contest.org/redirect.php?banner_id=100&url=/ _______________________________________________ Playerstage-commit mailing list Playerstage-commit@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/playerstage-commit