Update of /cvsroot/playerstage/code/player/libplayercore
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv10759/libplayercore
Modified Files:
device.cc message.cc message.h
Log Message:
fixed push/pull, and vfh
Index: message.cc
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/message.cc,v
retrieving revision 1.31
retrieving revision 1.32
diff -C2 -d -r1.31 -r1.32
*** message.cc 3 Dec 2007 01:31:14 -0000 1.31
--- message.cc 7 Dec 2007 01:50:14 -0000 1.32
***************
*** 83,87 ****
RefCount = rhs.RefCount;
(*RefCount)++;
- ready = false;
pthread_mutex_unlock(rhs.Lock);
--- 83,86 ----
***************
*** 193,196 ****
--- 192,196 ----
this->pull = false;
this->data_requested = false;
+ this->data_delivered = false;
}
***************
*** 198,204 ****
{
// clear the queue
! Message* msg;
! while((msg = this->Pop()))
! delete msg;
// clear the list of replacement rules
--- 198,209 ----
{
// clear the queue
! MessageQueueElement *e, *n;
! for(e = this->head; e;)
! {
! delete e->msg;
! n = e->next
! delete e;
! e = n;
! }
// clear the list of replacement rules
***************
*** 278,288 ****
if((hdr->type == PLAYER_MSGTYPE_REQ) ||
(hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
! (hdr->type == PLAYER_MSGTYPE_RESP_NACK) ||
! (hdr->type == PLAYER_MSGTYPE_SYNCH))
return(PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
// Replace data and command according to the this->Replace flag
else if((hdr->type == PLAYER_MSGTYPE_DATA) ||
(hdr->type == PLAYER_MSGTYPE_CMD))
! return(this->Replace ? PLAYER_PLAYER_MSG_REPLACE_RULE_REPLACE :
PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
else
{
--- 283,298 ----
if((hdr->type == PLAYER_MSGTYPE_REQ) ||
(hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
! (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
return(PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
// Replace data and command according to the this->Replace flag
else if((hdr->type == PLAYER_MSGTYPE_DATA) ||
(hdr->type == PLAYER_MSGTYPE_CMD))
! {
! // If we're over the queue length limit, ignore the new data/cmd message
! if(this->Length >= this->Maxlen)
! return(PLAYER_PLAYER_MSG_REPLACE_RULE_IGNORE);
! else
! return(this->Replace ? PLAYER_PLAYER_MSG_REPLACE_RULE_REPLACE :
PLAYER_PLAYER_MSG_REPLACE_RULE_ACCEPT);
! }
else
{
***************
*** 365,409 ****
}
- void MessageQueue::MarkAllReady (void)
- {
- MessageQueueElement *current;
- bool dataready=false;
-
- if (!pull)
- return; // No need to mark ready if not in pull mode
-
- Lock();
- // Mark all messages in the queue ready
- for (current = head; current != NULL; current = current->next)
- {
- player_msghdr_t* hdr;
- hdr = current->msg->GetHeader();
- // Only need to mark data and command messages. Requests and replies
- // get marked as they are pushed in
- if((hdr->type == PLAYER_MSGTYPE_DATA) || (hdr->type ==
PLAYER_MSGTYPE_CMD))
- {
- current->msg->SetReady ();
- dataready=true;
- }
- }
- Unlock ();
- // Only if there was at least one message, push a sync message onto the end
- if(dataready)
- {
- struct player_msghdr syncHeader;
- syncHeader.addr.host = 0;
- syncHeader.addr.robot = 0;
- syncHeader.addr.interf = PLAYER_PLAYER_CODE;
- syncHeader.addr.index = 0;
- syncHeader.type = PLAYER_MSGTYPE_SYNCH;
- syncHeader.subtype = 0;
- Message syncMessage (syncHeader, 0, 0);
- syncMessage.SetReady ();
- this->data_requested = false;
- Push (syncMessage, true);
- }
- }
-
-
void
MessageQueue::ClearFilter(void)
--- 375,378 ----
***************
*** 423,429 ****
pthread_mutex_unlock(&this->condMutex);
}
bool
! MessageQueue::Push(Message & msg, bool UseReserved)
{
player_msghdr_t* hdr;
--- 392,466 ----
pthread_mutex_unlock(&this->condMutex);
}
+
+ /// @brief Set the data_requested flag
+ void
+ MessageQueue::SetDataRequested(bool d, bool haveLock)
+ {
+ if(!haveLock)
+ this->Lock();
+ this->data_requested = d;
+ this->data_delivered = false;
+ if(!haveLock)
+ this->Unlock();
+ }
+
+ /// Put it at the front of the queue, without checking replacement rules
+ /// or size limits.
+ /// This method is used to insert responses to requests for data.
+ /// The caller may have already called Lock() on this queue
+ void
+ MessageQueue::PushFront(Message & msg, bool haveLock)
+ {
+ if(!haveLock)
+ this->Lock();
+ MessageQueueElement* newelt = new MessageQueueElement();
+ newelt->msg = new Message(msg);
+ if(!this->tail)
+ {
+ this->head = this->tail = newelt;
+ newelt->prev = newelt->next = NULL;
+ }
+ else
+ {
+ newelt->prev = NULL;
+ newelt->next = this->head;
+ this->head->prev = newelt;
+ this->head = newelt;
+ }
+ this->Length++;
+ if(!haveLock)
+ this->Unlock();
+ }
+
+ /// Push a message at the back of the queue, without checking replacement
+ /// rules or size limits.
+ /// This method is used internally to insert most messages.
+ /// The caller may have already called Lock() on this queue
+ void
+ MessageQueue::PushBack(Message & msg, bool haveLock)
+ {
+ if(!haveLock)
+ this->Lock();
+ MessageQueueElement* newelt = new MessageQueueElement();
+ newelt->msg = new Message(msg);
+ if(!this->tail)
+ {
+ this->head = this->tail = newelt;
+ newelt->prev = newelt->next = NULL;
+ }
+ else
+ {
+ this->tail->next = newelt;
+ newelt->prev = this->tail;
+ newelt->next = NULL;
+ this->tail = newelt;
+ }
+ this->Length++;
+ if(!haveLock)
+ this->Unlock();
+ }
bool
! MessageQueue::Push(Message & msg)
{
player_msghdr_t* hdr;
***************
*** 445,451 ****
el = el->prev)
{
! // Ignore ready flag outside pull mode - when a client goes to pull
mode, only the client's
! // queue is set to pull, so other queues will still ignore ready flag
! if(el->msg->Compare(msg) && (!el->msg->Ready () || !pull))
{
this->Remove(el);
--- 482,486 ----
el = el->prev)
{
! if(el->msg->Compare(msg))
{
this->Remove(el);
***************
*** 456,500 ****
}
}
! // Are we over the limit?
! if(this->Length >= this->Maxlen - (UseReserved ? 0 : 1))
! {
! PLAYER_WARN("tried to push onto a full message queue");
! this->Unlock();
! if(!this->filter_on)
! this->DataAvailable();
! return(false);
! }
! else
! {
! MessageQueueElement* newelt = new MessageQueueElement();
! newelt->msg = new Message(msg);
! if (!pull || (newelt->msg->GetHeader ()->type != PLAYER_MSGTYPE_DATA &&
! newelt->msg->GetHeader ()->type != PLAYER_MSGTYPE_CMD))
! {
! // If not in pull mode, or message is not data/cmd, set ready to true
immediatly
! newelt->msg->SetReady ();
! }
! if(!this->tail)
! {
! this->head = this->tail = newelt;
! newelt->prev = newelt->next = NULL;
! }
! else
! {
! this->tail->next = newelt;
! newelt->prev = this->tail;
! newelt->next = NULL;
! this->tail = newelt;
! }
! this->Length++;
! this->Unlock();
! if(!this->filter_on || this->Filter(msg))
! this->DataAvailable();
!
! // If the client has a pending request for data, try to fulfill it
! if(this->pull && this->data_requested)
! this->MarkAllReady();
! return(true);
! }
}
--- 491,508 ----
}
}
!
! this->PushBack(msg,true);
!
! // If it was a response, then mark it , to prompt
! // processing of the queue.
! if(!this->data_requested &&
! (hdr->type == PLAYER_MSGTYPE_RESP_ACK) ||
! (hdr->type == PLAYER_MSGTYPE_RESP_NACK))
! this->SetDataRequested(true,true);
!
! this->Unlock();
! if(!this->filter_on || this->Filter(msg))
! this->DataAvailable();
! return(true);
}
***************
*** 504,511 ****
MessageQueueElement* el;
Lock();
! if(this->Empty())
{
! Unlock();
! return(NULL);
}
--- 512,531 ----
MessageQueueElement* el;
Lock();
!
! // Look for the last response in the queue, starting at the tail.
! // If any responses are pending, we always send all messages up to and
! // including the last response.
! MessageQueueElement* resp_el=NULL;
! if(!this->filter_on && !this->data_requested)
{
! for(el = this->tail; el; el = el->prev)
! {
! if((el->msg->GetHeader()->type == PLAYER_MSGTYPE_RESP_NACK) ||
! (el->msg->GetHeader()->type == PLAYER_MSGTYPE_RESP_ACK))
! {
! resp_el = el;
! break;
! }
! }
}
***************
*** 514,519 ****
for(el = this->head; el; el = el->next)
{
! if(!this->filter_on || this->Filter(*el->msg))
{
this->Remove(el);
Unlock();
--- 534,546 ----
for(el = this->head; el; el = el->next)
{
! if(resp_el ||
! ((!this->filter_on || this->Filter(*el->msg)) &&
! (!this->pull || this->data_requested)))
{
+ if(el == resp_el)
+ resp_el = NULL;
+ if(this->data_requested &&
+ (el->msg->GetHeader()->type == PLAYER_MSGTYPE_DATA))
+ this->data_delivered = true;
this->Remove(el);
Unlock();
***************
*** 523,555 ****
}
}
- Unlock();
- return(NULL);
- }
! Message* MessageQueue::PopReady (void)
! {
! MessageQueueElement* el;
! Lock();
! if(this->Empty())
{
Unlock();
! return(NULL);
}
!
! // start at the head and traverse the queue until a filter-friendly
! // message (that is marked ready if in pull mode) is found
! for(el = this->head; el; el = el->next)
{
! if((!this->filter_on || this->Filter(*el->msg)) && ((pull &&
el->msg->Ready ()) || !pull))
! {
! this->Remove(el);
! Unlock();
! Message* retmsg = el->msg;
! delete el;
! return(retmsg);
! }
}
- Unlock();
- return(NULL);
}
--- 550,576 ----
}
}
! // queue is empty. if that data had been requested in pull mode, and
! // some has been delivered, then mark the end of this frame with a
! // sync message
! if(this->pull && this->data_requested && this->data_delivered)
{
+ struct player_msghdr syncHeader;
+ syncHeader.addr.host = 0;
+ syncHeader.addr.robot = 0;
+ syncHeader.addr.interf = PLAYER_PLAYER_CODE;
+ syncHeader.addr.index = 0;
+ syncHeader.type = PLAYER_MSGTYPE_SYNCH;
+ syncHeader.subtype = 0;
+ Message* syncMessage = new Message(syncHeader, 0, 0);
+ this->SetDataRequested(false,true);
Unlock();
! return(syncMessage);
}
! else
{
! Unlock();
! return(NULL);
}
}
Index: message.h
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/message.h,v
retrieving revision 1.22
retrieving revision 1.23
diff -C2 -d -r1.22 -r1.23
*** message.h 2 Nov 2007 18:09:20 -0000 1.22
--- message.h 7 Dec 2007 01:50:14 -0000 1.23
***************
*** 170,177 ****
/// Decrement ref count
void DecRef();
- /// Set ready to send
- void SetReady () { ready = true; }
- /// Check if ready to send
- bool Ready (void) const { return ready; }
/// queue to which any response to this message should be directed
--- 170,173 ----
***************
*** 192,197 ****
/// Used to lock access to Data.
pthread_mutex_t * Lock;
- /// Marks if the message is ready to be sent to the client
- bool ready;
};
--- 188,191 ----
***************
*** 328,336 ****
bool Empty() { return(this->head == NULL); }
/** Push a message onto the queue. Returns the success state of the Push
! operation (true if successful, false otherwise).
! UseReserved should only be set true when pushing sync
! messages on to the queue. If UseReserved is false then a single message
slot
! is reserved on the queue for a sync message */
! bool Push(Message& msg, bool UseReserved = false);
/** Pop a message off the queue.
--- 322,339 ----
bool Empty() { return(this->head == NULL); }
/** Push a message onto the queue. Returns the success state of the Push
! operation (true if successful, false otherwise). */
! bool Push(Message& msg);
!
! /// Put it at the front of the queue, without checking replacement rules
! /// or size limits.
! /// This method is used to insert responses to requests for data.
! /// The caller may have already called Lock() on this queue
! void PushFront(Message & msg, bool haveLock);
!
! /// Push a message at the back of the queue, without checking replacement
! /// rules or size limits.
! /// This method is used internally to insert most messages.
! /// The caller may have already called Lock() on this queue
! void PushBack(Message & msg, bool haveLock);
/** Pop a message off the queue.
***************
*** 338,346 ****
Returns pointer to said message, or NULL if the queue is empty */
Message* Pop();
- /** Pop a ready message off the queue.
- Pop the head (i.e., the first-inserted) message from the queue.
- If pull_flag is true, only pop messages marked as ready.
- Returns pointer to said message, or NULL if the queue is empty */
- Message* PopReady (void);
/** Set the @p Replace flag, which governs whether data and command
messages of the same subtype from the same device are replaced in
--- 341,344 ----
***************
*** 378,386 ****
void SetFilter(int host, int robot, int interf, int index,
int type, int subtype);
! /** Set the @p pull flag, which if true then requires messages to be
marked
! as ready before they will be sent to the client. */
void SetPull (bool _pull) { this->pull = _pull; }
- /// Mark all messages in the queue as ready to be sent
- void MarkAllReady (void);
/// @brief Get current length of queue, in elements.
--- 376,381 ----
void SetFilter(int host, int robot, int interf, int index,
int type, int subtype);
! /** Set the @p pull flag */
void SetPull (bool _pull) { this->pull = _pull; }
/// @brief Get current length of queue, in elements.
***************
*** 388,392 ****
/// @brief Set the data_requested flag
! void SetDataRequested(bool d) { this->data_requested = d; }
private:
/// @brief Lock the mutex associated with this queue.
--- 383,388 ----
/// @brief Set the data_requested flag
! void SetDataRequested(bool d, bool haveLock);
!
private:
/// @brief Lock the mutex associated with this queue.
***************
*** 428,431 ****
--- 424,429 ----
/// been delivered
bool data_requested;
+ /// @brief Flag that data was sent (in PULL mode)
+ bool data_delivered;
};
Index: device.cc
===================================================================
RCS file: /cvsroot/playerstage/code/player/libplayercore/device.cc,v
retrieving revision 1.23
retrieving revision 1.24
diff -C2 -d -r1.23 -r1.24
*** device.cc 14 Nov 2007 21:07:49 -0000 1.23
--- device.cc 7 Dec 2007 01:50:14 -0000 1.24
***************
*** 325,328 ****
--- 325,331 ----
// got something else
resp_queue->ClearFilter();
+ printf("%d:%d:%d:%d\n",
+ hdr->addr.interf, hdr->addr.index,
+ hdr->type, hdr->subtype);
PLAYER_ERROR("got unexpected message");
delete msg;
-------------------------------------------------------------------------
SF.Net email is sponsored by:
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit