Update of /cvsroot/playerstage/code/player/libplayercore
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10759/libplayercore

Modified Files:
        device.cc message.cc message.h 
Log Message:
fixed push/pull, and vfh

Index: message.cc
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/message.cc,v
retrieving revision 1.31
retrieving revision 1.32
diff -C2 -d -r1.31 -r1.32
*** message.cc  3 Dec 2007 01:31:14 -0000       1.31
--- message.cc  7 Dec 2007 01:50:14 -0000       1.32
***************
*** 83,87 ****
    RefCount = rhs.RefCount;
    (*RefCount)++;
-   ready = false;
  
    pthread_mutex_unlock(rhs.Lock);
--- 83,86 ----
***************
*** 193,196 ****
--- 192,196 ----
    this->pull = false;
    this->data_requested = false;
+   this->data_delivered = false;
  }
  
***************
*** 198,204 ****
  {
    // clear the queue
!   Message* msg;
!   while((msg = this->Pop()))
!     delete msg;
  
    // clear the list of replacement rules
--- 198,209 ----
  {
    // clear the queue
!   MessageQueueElement *e, *n;
!   for(e = this->head; e;)
!   {
!     delete e->msg;
!     n = e->next
!     delete e;
!     e = n;
!   }
  
    // clear the list of replacement rules
***************
*** 278,288 ****
    if((hdr->type == PLAYER_MSGTYPE_REQ) ||
       (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
!      (hdr->type == PLAYER_MSGTYPE_RESP_NACK) ||
!      (hdr->type == PLAYER_MSGTYPE_SYNCH))
      return(PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
    // Replace data and command according to the this->Replace flag
    else if((hdr->type == PLAYER_MSGTYPE_DATA) ||
            (hdr->type == PLAYER_MSGTYPE_CMD))
!     return(this->Replace ? PLAYER_PLAYER_MSG_REPLACE_RULE_REPLACE : 
PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
    else
    {
--- 283,298 ----
    if((hdr->type == PLAYER_MSGTYPE_REQ) ||
       (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
!      (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
      return(PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
    // Replace data and command according to the this->Replace flag
    else if((hdr->type == PLAYER_MSGTYPE_DATA) ||
            (hdr->type == PLAYER_MSGTYPE_CMD))
!   {
!     // If we're over the queue length limit, ignore the new data/cmd message
!     if(this->Length >= this->Maxlen)
!       return(PLAYER_PLAYER_MSG_REPLACE_RULE_IGNORE);
!     else
!       return(this->Replace ? PLAYER_PLAYER_MSG_REPLACE_RULE_REPLACE : 
PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
!   }
    else
    {
***************
*** 365,409 ****
  }
  
- void MessageQueue::MarkAllReady (void)
- {
-   MessageQueueElement *current;
-   bool dataready=false;
- 
-   if (!pull)
-     return;   // No need to mark ready if not in pull mode
- 
-   Lock();
-   // Mark all messages in the queue ready
-   for (current = head; current != NULL; current = current->next)
-   {
-     player_msghdr_t* hdr;
-     hdr = current->msg->GetHeader();
-     // Only need to mark data and command messages.  Requests and replies
-     // get marked as they are pushed in
-     if((hdr->type == PLAYER_MSGTYPE_DATA) || (hdr->type == 
PLAYER_MSGTYPE_CMD))
-     {
-       current->msg->SetReady ();
-       dataready=true;
-     }
-   }
-   Unlock ();
-   // Only if there was at least one message, push a sync message onto the end
-   if(dataready)
-   {
-     struct player_msghdr syncHeader;
-     syncHeader.addr.host = 0;
-     syncHeader.addr.robot = 0;
-     syncHeader.addr.interf = PLAYER_PLAYER_CODE;
-     syncHeader.addr.index = 0;
-     syncHeader.type = PLAYER_MSGTYPE_SYNCH;
-     syncHeader.subtype = 0;
-     Message syncMessage (syncHeader, 0, 0);
-     syncMessage.SetReady ();
-     this->data_requested = false;
-     Push (syncMessage, true);
-   }
- }
- 
- 
  void
  MessageQueue::ClearFilter(void)
--- 375,378 ----
***************
*** 423,429 ****
    pthread_mutex_unlock(&this->condMutex);
  }
  
  bool
! MessageQueue::Push(Message & msg, bool UseReserved)
  {
    player_msghdr_t* hdr;
--- 392,466 ----
    pthread_mutex_unlock(&this->condMutex);
  }
+     
+ /// @brief Set the data_requested flag
+ void 
+ MessageQueue::SetDataRequested(bool d, bool haveLock)
+ { 
+   if(!haveLock)
+     this->Lock();
+   this->data_requested = d; 
+   this->data_delivered = false; 
+   if(!haveLock)
+     this->Unlock();
+ }
+ 
+ /// Put it at the front of the queue, without checking replacement rules
+ /// or size limits.
+ /// This method is used to insert responses to requests for data.
+ /// The caller may have already called Lock() on this queue
+ void
+ MessageQueue::PushFront(Message & msg, bool haveLock)
+ {
+   if(!haveLock)
+     this->Lock();
+   MessageQueueElement* newelt = new MessageQueueElement();
+   newelt->msg = new Message(msg);
+   if(!this->tail)
+   {
+     this->head = this->tail = newelt;
+     newelt->prev = newelt->next = NULL;
+   }
+   else
+   {
+     newelt->prev = NULL;
+     newelt->next = this->head;
+     this->head->prev = newelt;
+     this->head = newelt;
+   }
+   this->Length++;
+   if(!haveLock)
+     this->Unlock();
+ }
+ 
+ /// Push a message at the back of the queue, without checking replacement 
+ /// rules or size limits.
+ /// This method is used internally to insert most messages.
+ /// The caller may have already called Lock() on this queue
+ void
+ MessageQueue::PushBack(Message & msg, bool haveLock)
+ {
+   if(!haveLock)
+     this->Lock();
+   MessageQueueElement* newelt = new MessageQueueElement();
+   newelt->msg = new Message(msg);
+   if(!this->tail)
+   {
+     this->head = this->tail = newelt;
+     newelt->prev = newelt->next = NULL;
+   }
+   else
+   {
+     this->tail->next = newelt;
+     newelt->prev = this->tail;
+     newelt->next = NULL;
+     this->tail = newelt;
+   }
+   this->Length++;
+   if(!haveLock)
+     this->Unlock();
+ }
  
  bool
! MessageQueue::Push(Message & msg)
  {
    player_msghdr_t* hdr;
***************
*** 445,451 ****
          el = el->prev)
      {
!       // Ignore ready flag outside pull mode - when a client goes to pull 
mode, only the client's
!       // queue is set to pull, so other queues will still ignore ready flag
!       if(el->msg->Compare(msg) && (!el->msg->Ready () || !pull))
        {
          this->Remove(el);
--- 482,486 ----
          el = el->prev)
      {
!       if(el->msg->Compare(msg))
        {
          this->Remove(el);
***************
*** 456,500 ****
      }
    }
!   // Are we over the limit?
!   if(this->Length >= this->Maxlen - (UseReserved ? 0 : 1))
!   {
!     PLAYER_WARN("tried to push onto a full message queue");
!     this->Unlock();
!     if(!this->filter_on)
!       this->DataAvailable();
!     return(false);
!   }
!   else
!   {
!     MessageQueueElement* newelt = new MessageQueueElement();
!     newelt->msg = new Message(msg);
!     if (!pull || (newelt->msg->GetHeader ()->type != PLAYER_MSGTYPE_DATA &&
!                 newelt->msg->GetHeader ()->type != PLAYER_MSGTYPE_CMD))
!     {
!       // If not in pull mode, or message is not data/cmd, set ready to true 
immediatly
!       newelt->msg->SetReady ();
!     }
!     if(!this->tail)
!     {
!       this->head = this->tail = newelt;
!       newelt->prev = newelt->next = NULL;
!     }
!     else
!     {
!       this->tail->next = newelt;
!       newelt->prev = this->tail;
!       newelt->next = NULL;
!       this->tail = newelt;
!     }
!     this->Length++;
!     this->Unlock();
!     if(!this->filter_on || this->Filter(msg))
!       this->DataAvailable();
!     
!     // If the client has a pending request for data, try to fulfill it
!     if(this->pull && this->data_requested)
!       this->MarkAllReady();
!     return(true);
!   }
  }
  
--- 491,508 ----
      }
    }
! 
!   this->PushBack(msg,true);
! 
!   // If it was a response, then mark it , to prompt
!   // processing of the queue.
!   if(!this->data_requested &&
!      (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
!      (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
!     this->SetDataRequested(true,true);
! 
!   this->Unlock();
!   if(!this->filter_on || this->Filter(msg))
!     this->DataAvailable();
!   return(true);
  }
  
***************
*** 504,511 ****
    MessageQueueElement* el;
    Lock();
!   if(this->Empty())
    {
!     Unlock();
!     return(NULL);
    }
  
--- 512,531 ----
    MessageQueueElement* el;
    Lock();
! 
!   // Look for the last response in the queue, starting at the tail.
!   // If any responses are pending, we always send all messages up to and
!   // including the last response.
!   MessageQueueElement* resp_el=NULL;
!   if(!this->filter_on && !this->data_requested)
    {
!     for(el = this->tail; el; el = el->prev)
!     {
!       if((el->msg->GetHeader()->type == PLAYER_MSGTYPE_RESP_NACK) ||
!          (el->msg->GetHeader()->type == PLAYER_MSGTYPE_RESP_ACK))
!       {
!         resp_el = el;
!         break;
!       }
!     }
    }
  
***************
*** 514,519 ****
    for(el = this->head; el; el = el->next)
    {
!     if(!this->filter_on || this->Filter(*el->msg))
      {
        this->Remove(el);
        Unlock();
--- 534,546 ----
    for(el = this->head; el; el = el->next)
    {
!     if(resp_el ||
!        ((!this->filter_on || this->Filter(*el->msg)) && 
!         (!this->pull || this->data_requested)))
      {
+       if(el == resp_el)
+         resp_el = NULL;
+       if(this->data_requested &&
+          (el->msg->GetHeader()->type == PLAYER_MSGTYPE_DATA))
+         this->data_delivered = true;
        this->Remove(el);
        Unlock();
***************
*** 523,555 ****
      }
    }
-   Unlock();
-   return(NULL);
- }
  
! Message* MessageQueue::PopReady (void)
! {
!   MessageQueueElement* el;
!   Lock();
!   if(this->Empty())
    {
      Unlock();
!     return(NULL);
    }
! 
!   // start at the head and traverse the queue until a filter-friendly
!   // message (that is marked ready if in pull mode) is found
!   for(el = this->head; el; el = el->next)
    {
!     if((!this->filter_on || this->Filter(*el->msg)) && ((pull && 
el->msg->Ready ()) || !pull))
!     {
!       this->Remove(el);
!       Unlock();
!       Message* retmsg = el->msg;
!       delete el;
!       return(retmsg);
!     }
    }
-   Unlock();
-   return(NULL);
  }
  
--- 550,576 ----
      }
    }
  
!   // queue is empty.  if that data had been requested in pull mode, and
!   // some has been delivered, then mark the end of this frame with a 
!   // sync message
!   if(this->pull && this->data_requested && this->data_delivered)
    {
+     struct player_msghdr syncHeader;
+     syncHeader.addr.host = 0;
+     syncHeader.addr.robot = 0;
+     syncHeader.addr.interf = PLAYER_PLAYER_CODE;
+     syncHeader.addr.index = 0;
+     syncHeader.type = PLAYER_MSGTYPE_SYNCH;
+     syncHeader.subtype = 0;
+     Message* syncMessage = new Message(syncHeader, 0, 0);
+     this->SetDataRequested(false,true);
      Unlock();
!     return(syncMessage);
    }
!   else
    {
!     Unlock();
!     return(NULL);
    }
  }
  

Index: message.h
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/message.h,v
retrieving revision 1.22
retrieving revision 1.23
diff -C2 -d -r1.22 -r1.23
*** message.h   2 Nov 2007 18:09:20 -0000       1.22
--- message.h   7 Dec 2007 01:50:14 -0000       1.23
***************
*** 170,177 ****
      /// Decrement ref count
      void DecRef();
-     /// Set ready to send
-     void SetReady ()            { ready = true; }
-     /// Check if ready to send
-     bool Ready (void) const     { return ready; }
  
      /// queue to which any response to this message should be directed
--- 170,173 ----
***************
*** 192,197 ****
      /// Used to lock access to Data.
      pthread_mutex_t * Lock;
-     /// Marks if the message is ready to be sent to the client
-     bool ready;
  };
  
--- 188,191 ----
***************
*** 328,336 ****
      bool Empty() { return(this->head == NULL); }
      /** Push a message onto the queue.  Returns the success state of the Push
!     operation (true if successful, false otherwise).
!     UseReserved should only be set true when pushing sync
!     messages on to the queue. If UseReserved is false then a single message 
slot
!     is reserved on the queue for a sync message */
!     bool Push(Message& msg, bool UseReserved = false);
  
      /** Pop a message off the queue.
--- 322,339 ----
      bool Empty() { return(this->head == NULL); }
      /** Push a message onto the queue.  Returns the success state of the Push
!     operation (true if successful, false otherwise). */
!     bool Push(Message& msg);
! 
!     /// Put it at the front of the queue, without checking replacement rules
!     /// or size limits.
!     /// This method is used to insert responses to requests for data.
!     /// The caller may have already called Lock() on this queue
!     void PushFront(Message & msg, bool haveLock);
! 
!     /// Push a message at the back of the queue, without checking replacement 
!     /// rules or size limits.
!     /// This method is used internally to insert most messages.
!     /// The caller may have already called Lock() on this queue
!     void PushBack(Message & msg, bool haveLock);
  
      /** Pop a message off the queue.
***************
*** 338,346 ****
      Returns pointer to said message, or NULL if the queue is empty */
      Message* Pop();
-     /** Pop a ready message off the queue.
-     Pop the head (i.e., the first-inserted) message from the queue.
-     If pull_flag is true, only pop messages marked as ready.
-     Returns pointer to said message, or NULL if the queue is empty */
-     Message* PopReady (void);
      /** Set the @p Replace flag, which governs whether data and command
      messages of the same subtype from the same device are replaced in
--- 341,344 ----
***************
*** 378,386 ****
      void SetFilter(int host, int robot, int interf, int index,
                     int type, int subtype);
!     /** Set the @p pull flag, which if true then requires messages to be 
marked
!     as ready before they will be sent to the client. */
      void SetPull (bool _pull) { this->pull = _pull; }
-     /// Mark all messages in the queue as ready to be sent
-     void MarkAllReady (void);
  
      /// @brief Get current length of queue, in elements.
--- 376,381 ----
      void SetFilter(int host, int robot, int interf, int index,
                     int type, int subtype);
!     /** Set the @p pull flag */
      void SetPull (bool _pull) { this->pull = _pull; }
  
      /// @brief Get current length of queue, in elements.
***************
*** 388,392 ****
  
      /// @brief Set the data_requested flag
!     void SetDataRequested(bool d) { this->data_requested = d; }
    private:
      /// @brief Lock the mutex associated with this queue.
--- 383,388 ----
  
      /// @brief Set the data_requested flag
!     void SetDataRequested(bool d, bool haveLock);
! 
    private:
      /// @brief Lock the mutex associated with this queue.
***************
*** 428,431 ****
--- 424,429 ----
      /// been delivered
      bool data_requested;
+     /// @brief Flag that data was sent (in PULL mode)
+     bool data_delivered;
  };
  

Index: device.cc
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/device.cc,v
retrieving revision 1.23
retrieving revision 1.24
diff -C2 -d -r1.23 -r1.24
*** device.cc   14 Nov 2007 21:07:49 -0000      1.23
--- device.cc   7 Dec 2007 01:50:14 -0000       1.24
***************
*** 325,328 ****
--- 325,331 ----
      // got something else
      resp_queue->ClearFilter();
+     printf("%d:%d:%d:%d\n",
+            hdr->addr.interf, hdr->addr.index,
+            hdr->type, hdr->subtype);
      PLAYER_ERROR("got unexpected message");
      delete msg;


-------------------------------------------------------------------------
SF.Net email is sponsored by:
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit

Reply via email to