Revision: 6946
          http://playerstage.svn.sourceforge.net/playerstage/?rev=6946&view=rev
Author:   thjc
Date:     2008-08-09 02:30:28 +0000 (Sat, 09 Aug 2008)

Log Message:
-----------
Fixed some obscure timing issues in libplayertcp/remote driver/filewatcher code 
which only surfaced with specific use of the passthrough driver.
Also loop iteration in wait call of filewatcher.

Modified Paths:
--------------
    code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
    code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
    code/player/branches/release-2-1-patches/libplayertcp/playertcp.h
    code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc
    code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h
    code/player/branches/release-2-1-patches/server/server.cc

Modified: code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc       
2008-08-07 13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc       
2008-08-09 02:30:28 UTC (rev 6946)
@@ -45,6 +45,7 @@
        Lock();
        if (WatchedFilesArrayCount == 0)
        {
+               PLAYER_ERROR("File watcher wait called with no files to watch");
                Unlock();
                return 0;
        }
@@ -75,20 +76,25 @@
        struct timeval t;
        t.tv_sec = static_cast<int> (floor(Timeout));
        t.tv_usec = static_cast<int> ((Timeout - static_cast<int> 
(floor(Timeout))) * 1e6);
-       Unlock();
 
        int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t);
 
        if (ret < 0)
        {
                PLAYER_ERROR2("Select called failed in File Watcher: %d 
%s",errno,strerror(errno));
+               Unlock();
                return ret;
        }
+       else if (ret == 0)
+       {
+               Unlock();
+               return 0;
+       }
 
        int queueless_count = 0;
+       int match_count = 0;
 
-       Lock();
-       for (unsigned int ii = 0; ii < WatchedFilesArrayCount && 
static_cast<int> (ii) < maxfd; ++ii)
+       for (unsigned int ii = 0; ii < WatchedFilesArrayCount && ret > 
match_count; ++ii)
        {
                int fd = WatchedFiles[ii].fd;
                QueuePointer &q = WatchedFiles[ii].queue;
@@ -98,18 +104,26 @@
                                        (WatchedFiles[ii].Write && 
FD_ISSET(fd,&WriteFds)) ||
                                        (WatchedFiles[ii].Except && 
FD_ISSET(fd,&ExceptFds)))
                        {
+                               match_count++;
                                if (q != NULL)
                                {
                                        q->DataAvailable();
                                }
                                else
+                               {
                                        queueless_count++;
+                               }
 
                        }
                }
        }
        Unlock();
 
+       if (ret != match_count)
+       {
+               PLAYER_ERROR1("Failed to match %d file descriptors in select 
results",ret - match_count);
+       }
+
        return queueless_count;
 }
 
@@ -125,32 +139,27 @@
        Lock();
        // find the first available file descriptor
        struct fd_driver_pair *next_entry = NULL;
-       if (WatchedFilesArrayCount < WatchedFilesArraySize)
+               // first see if there is an empty spot in the list
+       for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
        {
-               next_entry = &WatchedFiles[WatchedFilesArrayCount];
-               WatchedFilesArrayCount++;
+               if (WatchedFiles[ii].fd < 0)
+               {
+                       next_entry = &WatchedFiles[ii];
+                       break;
+               }
        }
-       else
+       if (next_entry == NULL)
        {
-               // first see if there is an empty spot in the list
-               for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+               if (WatchedFilesArrayCount >= WatchedFilesArraySize)
                {
-                       if (WatchedFiles[ii].fd < 0)
-                       {
-                               next_entry = &WatchedFiles[ii];
-                               break;
-                       }
-               }
-               if (next_entry == NULL)
-               {
                        // otherwise we allocate some more room for the array
                        size_t orig_size = WatchedFilesArraySize;
                        WatchedFilesArraySize*=2;
                        WatchedFiles = reinterpret_cast<struct fd_driver_pair 
*> (realloc(WatchedFiles,sizeof(WatchedFiles[0])*WatchedFilesArraySize));
                        
memset(&WatchedFiles[orig_size],0,sizeof(WatchedFiles[0])*(WatchedFilesArraySize-orig_size));
-                       next_entry = &WatchedFiles[WatchedFilesArrayCount];
-
                }
+               next_entry = &WatchedFiles[WatchedFilesArrayCount];
+               WatchedFilesArrayCount++;
        }
 
        next_entry->fd = fd;
@@ -158,6 +167,7 @@
        next_entry->Read = WatchRead;
        next_entry->Write = WatchWrite;
        next_entry->Except = WatchExcept;
+
        Unlock();
        return 0;
 }

Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc  
2008-08-07 13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc  
2008-08-09 02:30:28 UTC (rev 6946)
@@ -214,7 +214,9 @@
                      bool have_lock)
 {
   if(!have_lock)
+  {
     Lock();
+  }
 
   unsigned char data[PLAYER_IDENT_STRLEN];
 
@@ -254,7 +256,6 @@
   // set up for later use by global file watcher
   fileWatcher->AddFileWatch(this->client_ufds[j].fd);
 
-
   // Create an outgoing queue for this client
   this->clients[j].queue =
           QueuePointer(true,PLAYER_MSGQUEUE_DEFAULT_MAXLEN);
@@ -329,7 +330,9 @@
   }
 
   if(!num_accepts)
+  {
     return(0);
+  }
 
   for(int i=0; (i<num_listeners) && (num_accepts>0); i++)
   {
@@ -395,9 +398,9 @@
     }
   }
   free(this->clients[cli].dev_subs);
+  fileWatcher->RemoveFileWatch(this->clients[cli].fd);
   if(close(this->clients[cli].fd) < 0)
     PLAYER_WARN1("close() failed: %s", strerror(errno));
-  fileWatcher->RemoveFileWatch(this->clients[cli].fd);
 
   this->clients[cli].fd = -1;
   this->clients[cli].valid = 0;

Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playertcp.h   
2008-08-07 13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.h   
2008-08-09 02:30:28 UTC (rev 6946)
@@ -136,13 +136,13 @@
     /** Total size of @p decode_readbuffer */
     int decode_readbuffersize;
 
+    void Lock();
+    void Unlock();
 
   public:
     PlayerTCP();
     ~PlayerTCP();
 
-    void Lock();
-    void Unlock();
 
     static void InitGlobals(void);
 
@@ -150,7 +150,7 @@
 
     int Listen(int* ports, int num_ports, int* new_ports=NULL);
     int Listen(int port);
-    QueuePointer AddClient(struct sockaddr_in* cliaddr, 
+    QueuePointer AddClient(struct sockaddr_in* cliaddr,
                             unsigned int local_host,
                             unsigned int local_port,
                             int newsock,

Modified: code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc      
2008-08-07 13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/libplayertcp/remote_driver.cc      
2008-08-09 02:30:28 UTC (rev 6946)
@@ -2,7 +2,6 @@
  *  Player - One Hell of a Robot Server
  *  Copyright (C) <insert dates here>
  *     <insert author's name(s) here>
- *                      
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -55,24 +54,20 @@
 #include <libplayerxdr/playerxdr.h>
 #include "remote_driver.h"
 
-TCPRemoteDriver::TCPRemoteDriver(player_devaddr_t addr, void* arg) 
-        : Driver(NULL, 0, false, PLAYER_MSGQUEUE_DEFAULT_MAXLEN)
+TCPRemoteDriver::TCPRemoteDriver(player_devaddr_t addr, void* arg)
+        : Driver(NULL, 0, false, PLAYER_MSGQUEUE_DEFAULT_MAXLEN),
+          sock(-1),
+          setup_timeout (DEFAULT_SETUP_TIMEOUT)
 {
-  if(arg)
-    this->ptcp = (PlayerTCP*)arg;
-  else
-    this->ptcp = NULL;
-
-  this->sock = -1;
-  this->setup_timeout = DEFAULT_SETUP_TIMEOUT;
+    ptcp = reinterpret_cast<PlayerTCP*> (arg);
 }
 
 TCPRemoteDriver::~TCPRemoteDriver()
 {
 }
 
-int 
-TCPRemoteDriver::Setup() 
+int
+TCPRemoteDriver::Setup()
 {
   struct sockaddr_in server;
   char banner[PLAYER_IDENT_STRLEN];
@@ -94,7 +89,7 @@
   }
 
 
-  // Construct socket 
+  // Construct socket
   this->sock = socket(PF_INET, SOCK_STREAM, 0);
   if(this->sock < 0)
   {
@@ -106,17 +101,17 @@
   server.sin_addr.s_addr = this->device_addr.host;
   server.sin_port = htons(this->device_addr.robot);
 
-  // Connect the socket 
+  // Connect the socket
   if(connect(this->sock, (struct sockaddr*)&server, sizeof(server)) < 0)
   {
     PLAYER_ERROR3("connect call on [%s:%u] failed with error [%s]",
                 this->ipaddr,
-                this->device_addr.robot, 
+                this->device_addr.robot,
                 strerror(errno));
     return(-1);
   }
 
-  printf("connected to: %s:%u\n",
+  PLAYER_MSG2(2,"connected to: %s:%u\n",
          this->ipaddr, this->device_addr.robot);
 
   // make the socket non-blocking
@@ -143,7 +138,7 @@
   numread=0;
   while(numread < (int)sizeof(banner))
   {
-    if((thisnumread = read(this->sock, banner+numread, 
+    if((thisnumread = read(this->sock, banner+numread,
                            sizeof(banner)-numread)) < 0)
     {
       if(errno != EAGAIN)
@@ -171,13 +166,14 @@
 
   // Add this socket for monitoring
   this->kill_flag = 0;
-  this->queue = this->ptcp->AddClient(NULL, 
-                                      this->device_addr.host, 
-                                      this->device_addr.robot, 
-                                      this->sock, 
+  this->queue = this->ptcp->AddClient(NULL,
+                                      this->device_addr.host,
+                                      this->device_addr.robot,
+                                      this->sock,
                                       false,
                                       &this->kill_flag,
                                       (this->ptcp->thread == pthread_self()));
+
   PLAYER_MSG0(5,"Adding new TCPRemoteDriver to the PlayerTCP Client 
List...Success");
 
   return(0);
@@ -206,7 +202,7 @@
   req.driver_name_count = 0;
 
   // Encode the body
-  if((encode_msglen = 
+  if((encode_msglen =
       player_device_req_pack(buf + PLAYERXDR_MSGHDR_SIZE,
                              sizeof(buf)-PLAYERXDR_MSGHDR_SIZE,
                              &req, PLAYERXDR_ENCODE)) < 0)
@@ -233,7 +229,7 @@
 
   while(numbytes < encode_msglen)
   {
-    if((thisnumbytes = write(this->sock, buf+numbytes, 
+    if((thisnumbytes = write(this->sock, buf+numbytes,
                              encode_msglen-numbytes)) < 0)
     {
       if(errno != EAGAIN)
@@ -258,13 +254,13 @@
   // In any case, explicitly unsubscribing is just a courtesy.
   if(mode == PLAYER_CLOSE_MODE)
     return(0);
-  
+
   // Receive the response header
   GlobalTime->GetTimeDouble(&t1);
   numbytes = 0;
   while(numbytes < PLAYERXDR_MSGHDR_SIZE)
   {
-    if((thisnumbytes = read(this->sock, buf+numbytes, 
+    if((thisnumbytes = read(this->sock, buf+numbytes,
                             PLAYERXDR_MSGHDR_SIZE-numbytes)) < 0)
     {
       if(errno != EAGAIN)
@@ -291,7 +287,7 @@
   }
 
   // Is it the right kind of message?
-  if(!Message::MatchMessage(&hdr, 
+  if(!Message::MatchMessage(&hdr,
                             PLAYER_MSGTYPE_RESP_ACK,
                             PLAYER_PLAYER_REQ_DEV,
                             hdr.addr))
@@ -305,7 +301,7 @@
   numbytes = 0;
   while(numbytes < (int)hdr.size)
   {
-    if((thisnumbytes = read(this->sock, buf+PLAYERXDR_MSGHDR_SIZE+numbytes, 
+    if((thisnumbytes = read(this->sock, buf+PLAYERXDR_MSGHDR_SIZE+numbytes,
                             hdr.size-numbytes)) < 0)
     {
       if(errno != EAGAIN)
@@ -351,8 +347,8 @@
   return(0);
 }
 
-int 
-TCPRemoteDriver::Shutdown() 
+int
+TCPRemoteDriver::Shutdown()
 {
   // Have we already been killed?
   if(!this->kill_flag)
@@ -366,27 +362,12 @@
     this->ptcp->DeleteClient(this->queue,
                              (this->ptcp->thread == pthread_self()));
   }
-  return(0); 
+  return(0);
 }
 
-void 
-TCPRemoteDriver::Update()
-{
-  if(this->ptcp->thread == pthread_self())
-  {
-    //this->ptcp->Read(0,true);
-    this->ptcp->Lock();
-    this->ptcp->ReadClient(this->queue);
-    this->ptcp->Unlock();
-  }
-  this->ProcessMessages();
-  if(this->ptcp->thread == pthread_self())
-    this->ptcp->Write(false);
-}
-
-int 
-TCPRemoteDriver::ProcessMessage(QueuePointer &resp_queue, 
-                                player_msghdr * hdr, 
+int
+TCPRemoteDriver::ProcessMessage(QueuePointer &resp_queue,
+                                player_msghdr * hdr,
                                 void * data)
 {
   // Is it data from the remote device?
@@ -444,7 +425,7 @@
     }
   }
   // Forward response (success or failure) from the laser
-  else if((Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_ACK, 
+  else if((Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_ACK,
                                  -1, this->device_addr)) ||
           (Message::MatchMessage(hdr, PLAYER_MSGTYPE_RESP_NACK,
                                  -1, this->device_addr)))
@@ -461,7 +442,7 @@
 }
 
 
-Driver* 
+Driver*
 TCPRemoteDriver::TCPRemoteDriver_Init(player_devaddr_t addr, void* arg)
 {
   return((Driver*)(new TCPRemoteDriver(addr, arg)));

Modified: code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h       
2008-08-07 13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/libplayertcp/remote_driver.h       
2008-08-09 02:30:28 UTC (rev 6946)
@@ -2,8 +2,8 @@
  *  Player - One Hell of a Robot Server
  *  Copyright (C) <insert dates here>
  *     <insert author's name(s) here>
- *                      
  *
+ *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
@@ -63,9 +63,8 @@
 
     virtual int Setup();
     virtual int Shutdown();
-    virtual void Update();
-    virtual int ProcessMessage(QueuePointer & resp_queue, 
-                               player_msghdr * hdr, 
+    virtual int ProcessMessage(QueuePointer & resp_queue,
+                               player_msghdr * hdr,
                                void * data);
 
     static Driver* TCPRemoteDriver_Init(player_devaddr_t addr, void* arg);

Modified: code/player/branches/release-2-1-patches/server/server.cc
===================================================================
--- code/player/branches/release-2-1-patches/server/server.cc   2008-08-07 
13:48:09 UTC (rev 6945)
+++ code/player/branches/release-2-1-patches/server/server.cc   2008-08-09 
02:30:28 UTC (rev 6946)
@@ -295,26 +295,27 @@
   while(!player_quit)
   {
     // wait until something other than driver requested watches happens
-    fileWatcher->Wait();
-
-    if(ptcp->Accept(0) < 0)
+    int numready = fileWatcher->Wait(0.01); // run at a minimum of 100Hz for 
other drivers
+    if (numready > 0)
     {
-      PLAYER_ERROR("failed while accepting new TCP connections");
-      break;
-    }
+      if(ptcp->Accept(0) < 0)
+      {
+        PLAYER_ERROR("failed while accepting new TCP connections");
+        break;
+      }
 
-    if(ptcp->Read(100,false) < 0)
-    {
-      PLAYER_ERROR("failed while reading from TCP clients");
-      break;
-    }
+      if(ptcp->Read(0,false) < 0)
+      {
+        PLAYER_ERROR("failed while reading from TCP clients");
+        break;
+      }
 
-    if(pudp->Read(0) < 0)
-    {
-      PLAYER_ERROR("failed while reading from UDP clients");
-      break;
+      if(pudp->Read(0) < 0)
+      {
+        PLAYER_ERROR("failed while reading from UDP clients");
+        break;
+      }
     }
-
     deviceTable->UpdateDevices();
 
     if(ptcp->Write(false) < 0)


This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.

-------------------------------------------------------------------------
This SF.Net email is sponsored by the Moblin Your Move Developer's challenge
Build the coolest Linux based applications with Moblin SDK & win great prizes
Grand prize is a trip for two to an Open Source event anywhere in the world
http://moblin-contest.org/redirect.php?banner_id=100&url=/
_______________________________________________
Playerstage-commit mailing list
Playerstage-commit@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/playerstage-commit

Reply via email to