Revision: 6380
http://playerstage.svn.sourceforge.net/playerstage/?rev=6380&view=rev
Author: thjc
Date: 2008-04-23 02:38:51 -0700 (Wed, 23 Apr 2008)
Log Message:
-----------
added group request to blackboard, as per release-2-1-patches 6371
Modified Paths:
--------------
code/player/trunk/client_libs/libplayerc/dev_blackboard.c
code/player/trunk/client_libs/libplayerc/playerc.h
code/player/trunk/client_libs/libplayerc++/blackboardproxy.cc
code/player/trunk/client_libs/libplayerc++/playerc++.h
code/player/trunk/libplayercore/interfaces/064_blackboard.def
code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
Property Changed:
----------------
code/player/trunk/
Property changes on: code/player/trunk
___________________________________________________________________
Name: svn:ignore
- autom4te.cache
config.h
Makefile
Makefile.in
aclocal.m4
compile
config.guess
config.h.in
config.status
config.sub
configure
depcomp
install-sh
libtool
ltmain.sh
missing
stamp-h1
.project
+ autom4te.cache
config.h
Makefile
Makefile.in
aclocal.m4
compile
config.guess
config.h.in
config.status
config.sub
configure
depcomp
install-sh
libtool
ltmain.sh
missing
stamp-h1
.project
build
Modified: code/player/trunk/client_libs/libplayerc/dev_blackboard.c
===================================================================
--- code/player/trunk/client_libs/libplayerc/dev_blackboard.c 2008-04-23
09:04:14 UTC (rev 6379)
+++ code/player/trunk/client_libs/libplayerc/dev_blackboard.c 2008-04-23
09:38:51 UTC (rev 6380)
@@ -329,6 +329,81 @@
}
+// Subscribe to a blackboard group
+int playerc_blackboard_subscribe_to_group(playerc_blackboard_t* device, const
char* group)
+{
+ player_blackboard_entry_t req;
+ memset(&req, 0, sizeof(req));
+ req.key = strdup("");
+ req.key_count = strlen("") + 1;
+
+ req.group = strdup(group);
+ req.group_count = strlen(group) + 1;
+
+ if (playerc_client_request(device->info.client, &device->info,
+ PLAYER_BLACKBOARD_REQ_SUBSCRIBE_TO_GROUP, &req, NULL) < 0)
+ {
+ if (req.key != NULL)
+ {
+ free(req.key);
+ }
+ if (req.group != NULL)
+ {
+ free(req.group);
+ }
+ PLAYERC_ERR("failed to subscribe to blackboard group");
+ return -1;
+ }
+
+ if (req.key != NULL)
+ {
+ free(req.key);
+ }
+ if (req.group != NULL)
+ {
+ free(req.group);
+ }
+ return 0;
+}
+
+// Unsubscribe from a blackboard group
+int playerc_blackboard_unsubscribe_from_group(playerc_blackboard_t* device,
const char* group)
+{
+ player_blackboard_entry_t req;
+ memset(&req, 0, sizeof(req));
+ req.key = strdup("");
+ req.key_count = strlen("") + 1;
+
+ req.group = strdup(group);
+ req.group_count = strlen(group) + 1;
+
+ if (playerc_client_request(device->info.client, &device->info,
+ PLAYER_BLACKBOARD_REQ_UNSUBSCRIBE_FROM_GROUP, &req, NULL) < 0)
+ {
+ if (req.key)
+ {
+ free(req.key);
+ }
+ if (req.group)
+ {
+ free(req.group);
+ }
+ PLAYERC_ERR("failed to unsubscribe to blackboard group");
+ return -1;
+ }
+
+ if (req.key)
+ {
+ free(req.key);
+ }
+ if (req.group)
+ {
+ free(req.group);
+ }
+ return 0;
+
+}
+
// Set a key
int playerc_blackboard_set_entry(playerc_blackboard_t *device,
player_blackboard_entry_t* entry)
{
Modified: code/player/trunk/client_libs/libplayerc/playerc.h
===================================================================
--- code/player/trunk/client_libs/libplayerc/playerc.h 2008-04-23 09:04:14 UTC
(rev 6379)
+++ code/player/trunk/client_libs/libplayerc/playerc.h 2008-04-23 09:38:51 UTC
(rev 6380)
@@ -1176,6 +1176,12 @@
/** @brief Unsubscribe from a key. */
int playerc_blackboard_unsubscribe_from_key(playerc_blackboard_t *device,
const char* key, const char* group);
+/** @brief Subscribe to a group. The current entries are sent as data
messages. */
+int playerc_blackboard_subscribe_to_group(playerc_blackboard_t *device, const
char* group);
+
+/** @brief Unsubscribe from a group. */
+int playerc_blackboard_unsubscribe_from_group(playerc_blackboard_t *device,
const char* group);
+
/** @brief Set an entry value. */
int playerc_blackboard_set_entry(playerc_blackboard_t *device,
player_blackboard_entry_t* entry);
Modified: code/player/trunk/client_libs/libplayerc++/blackboardproxy.cc
===================================================================
--- code/player/trunk/client_libs/libplayerc++/blackboardproxy.cc
2008-04-23 09:04:14 UTC (rev 6379)
+++ code/player/trunk/client_libs/libplayerc++/blackboardproxy.cc
2008-04-23 09:38:51 UTC (rev 6380)
@@ -97,7 +97,7 @@
if (0 != playerc_blackboard_subscribe_to_key(mDevice, key, group, &pointer))
{
- throw PlayerError("BlackBoardProxy::SubscribeToKey(const string& key)",
"could not subscribe to key");
+ throw PlayerError("BlackBoardProxy::SubscribeToKey(const char* key,
const char* group)", "could not subscribe to key");
}
assert(pointer);
@@ -142,10 +142,29 @@
scoped_lock_t lock(mPc->mMutex);
if (0 != playerc_blackboard_unsubscribe_from_key(mDevice, key, group))
{
- throw PlayerError("BlackBoardProxy::UnsubscribeFromKey(const
string& key)", "could not unsubscribe from key");
+ throw PlayerError("BlackBoardProxy::UnsubscribeFromKey(const
char* key, const char* group)", "could not unsubscribe from key");
}
}
+void BlackBoardProxy::SubscribeToGroup(const char* group)
+{
+ scoped_lock_t lock(mPc->mMutex);
+
+ if (0 != playerc_blackboard_subscribe_to_group(mDevice, group))
+ {
+ throw PlayerError("BlackBoardProxy::SubscribeToGroup(const char*
group)", "could not subscribe to group");
+ }
+}
+
+void BlackBoardProxy::UnsubscribeFromGroup(const char* group)
+{
+ scoped_lock_t lock(mPc->mMutex);
+ if (0 != playerc_blackboard_unsubscribe_from_group(mDevice, group))
+ {
+ throw PlayerError("BlackBoardProxy::UnsubscribeFromGroup(const
char* group)", "could not unsubscribe from group");
+ }
+}
+
void BlackBoardProxy::SetEntry(const player_blackboard_entry_t &entry)
{
scoped_lock_t lock(mPc->mMutex);
Modified: code/player/trunk/client_libs/libplayerc++/playerc++.h
===================================================================
--- code/player/trunk/client_libs/libplayerc++/playerc++.h 2008-04-23
09:04:14 UTC (rev 6379)
+++ code/player/trunk/client_libs/libplayerc++/playerc++.h 2008-04-23
09:38:51 UTC (rev 6380)
@@ -347,6 +347,10 @@
player_blackboard_entry_t *SubscribeToKey(const char *key, const char*
group = "");
/** Stop receiving updates about this key. */
void UnsubscribeFromKey(const char *key, const char* group = "");
+ /** Subscribe to a group. The event handler must be set to retrieve the
current group entries. */
+ void SubscribeToGroup(const char* key);
+ /** Stop receiving updates about this group. */
+ void UnsubscribeFromGroup(const char* group);
/** Set a key value */
void SetEntry(const player_blackboard_entry_t &entry);
/** Set the function pointer which will be called when an entry is
updated. */
Modified: code/player/trunk/libplayercore/interfaces/064_blackboard.def
===================================================================
--- code/player/trunk/libplayercore/interfaces/064_blackboard.def
2008-04-23 09:04:14 UTC (rev 6379)
+++ code/player/trunk/libplayercore/interfaces/064_blackboard.def
2008-04-23 09:38:51 UTC (rev 6380)
@@ -9,6 +9,10 @@
message { REQ, UNSUBSCRIBE_FROM_KEY, 2, player_blackboard_entry_t };
/** Request/reply subtype: set entry. */
message { REQ, SET_ENTRY, 3, player_blackboard_entry_t };
+/** Request/reply subtype: subscribe to group. */
+message { REQ, SUBSCRIBE_TO_GROUP, 4, player_blackboard_entry_t };
+/** Request/reply subtype: unsubscribe from group. */
+message { REQ, UNSUBSCRIBE_FROM_GROUP, 5, player_blackboard_entry_t };
/** Data update reply */
message { DATA, UPDATE, 1, player_blackboard_entry_t };
Modified: code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
===================================================================
--- code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
2008-04-23 09:04:14 UTC (rev 6379)
+++ code/player/trunk/server/drivers/blackboard/localbb/localbb.cpp
2008-04-23 09:38:51 UTC (rev 6380)
@@ -220,6 +220,24 @@
* @return 0 for success, -1 on error.
*/
int ProcessSetEntryMessage(QueuePointer &resp_queue,
player_msghdr * hdr, void * data);
+ /** @brief Process a subscribe to group message.
+ * Adds the response queue to the list of devices listenening
to that group in the map.
+ * Retrieves the entries for that group.
+ * Publishes the entries.
+ * @param resp_queue Player response queue.
+ * @param hdr Message header.
+ * @param data Message data.
+ * @return 0 for success, -1 on error.
+ */
+ int ProcessSubscribeGroupMessage(QueuePointer &resp_queue,
player_msghdr * hdr, void * data);
+ /** @brief Process an unsubscribe from group message.
+ * Removes the response queue from the list of devices
listening to that group in the map.
+ * @param resp_queue Player response queue.
+ * @param hdr Message header.
+ * @param data Message data.
+ * @return 0 for success, -1 on error.
+ * */
+ int ProcessUnsubscribeGroupMessage(QueuePointer &resp_queue,
player_msghdr * hdr, void * data);
// Blackboard handler functions
/** @brief Add the key and queue combination to the listeners
hash-map and return the entry for the key.
@@ -231,9 +249,23 @@
/** @brief Remove the key and queue combination from the
listeners hash-map.
* @param key Entry key.
* @param group Second identifier.
- * @param qp Player response queue of the subscriber.
+ * @param qp Player response queue of the unsubscriber.
+ * @return Blackboard entry containing the value of the key and
group.
*/
void UnsubscribeKey(const string &key, const string &group,
const QueuePointer &qp);
+ /** @brief Add a group to the group listeners hash-map and
return all entries of that group
+ * @param group Entry gruop
+ * @param qp resp_queue Player response queue of the subscriber
+ * @return Vector of blackboard entries of that group
+ */
+ vector<BlackBoardEntry> SubscribeGroup(const string &group,
const QueuePointer &qp);
+ /**
+ * @brief Remove the group from the group listeners hash-map.
+ * @param group Entry group
+ * @param qp Player response queue of the unsubscriber
+ */
+ void UnsubscribeGroup(const string &group, const QueuePointer
&qp);
+
/** @brief Set the entry in the entries hashmap. *
* @param entry BlackBoardEntry that must be put in the hashmap.
*/
@@ -246,12 +278,18 @@
bool CheckHeader(player_msghdr * hdr);
// Internal blackboard data
- /** Map of labels to entry data. */
- //map<group, map<key, entry> >
+ /** Map of labels to entry data.
+ * map<group, map<key, entry> >
+ */
map<string, map<string, BlackBoardEntry> > entries;
- /** Map of labels to listening queues. */
- //map<group, map<key, vector<device queue> > >
+ /** Map of labels to listening queues.
+ * map<group, map<key, vector<device queue> > >
+ */
map<string, map<string, vector<QueuePointer> > > listeners;
+ /** Map of groups to queues subscribed to groups.
+ * map<group, vector<device queue> >
+ */
+ map<string, vector<QueuePointer> > group_listeners;
};
////////////////////////////////////////////////////////////////////////////////
@@ -318,6 +356,14 @@
{
return ProcessSetEntryMessage(resp_queue, hdr, data);
}
+ else if (Message::MatchMessage(hdr, PLAYER_MSGTYPE_REQ,
PLAYER_BLACKBOARD_REQ_SUBSCRIBE_TO_GROUP, this->device_addr))
+ {
+ return ProcessSubscribeGroupMessage(resp_queue, hdr, data);
+ }
+ else if (Message::MatchMessage(hdr, PLAYER_MSGTYPE_REQ,
PLAYER_BLACKBOARD_REQ_UNSUBSCRIBE_FROM_GROUP, this->device_addr))
+ {
+ return ProcessUnsubscribeGroupMessage(resp_queue, hdr, data);
+ }
// Don't know how to handle this message
return -1;
}
@@ -388,6 +434,84 @@
}
////////////////////////////////////////////////////////////////////////////////
+// Subscribe a device to a group. Send out data messages for the current group
entries.
+int LocalBB::ProcessSubscribeGroupMessage(QueuePointer &resp_queue,
player_msghdr * hdr, void * data)
+{
+ if (!CheckHeader(hdr))
+ return -1;
+
+ // Add the device to the listeners map
+ player_blackboard_entry_t *request =
reinterpret_cast<player_blackboard_entry_t*>(data);
+ vector<BlackBoardEntry> current_values = SubscribeGroup(request->group,
resp_queue);
+
+ // Get the entries for the given key and send the data updates
+ for (vector<BlackBoardEntry>::iterator itr=current_values.begin(); itr
!= current_values.end(); itr++)
+ {
+ BlackBoardEntry current_value = *itr;
+ player_blackboard_entry_t response =
ToPlayerBlackBoardEntry(current_value);
+ size_t response_size = sizeof(player_blackboard_entry_t) +
response.key_count + response.group_count + response.data_count;
+
+ // Publish the blackboard entries
+ this->Publish(this->device_addr,
+ resp_queue,
+ PLAYER_MSGTYPE_DATA,
+ PLAYER_BLACKBOARD_DATA_UPDATE,
+ &response,
+ response_size,
+ NULL);
+
+ if (response.key)
+ {
+ delete [] response.key;
+ }
+ if (response.group)
+ {
+ delete [] response.group;
+ }
+ if (response.data)
+ {
+ delete [] response.data;
+ }
+ }
+
+ // Then send an empty ack
+ this->Publish(
+ this->device_addr,
+ resp_queue,
+ PLAYER_MSGTYPE_RESP_ACK,
+ PLAYER_BLACKBOARD_REQ_SUBSCRIBE_TO_GROUP,
+ NULL,
+ 0,
+ NULL);
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Unsubscribe a device from a group.
+int LocalBB::ProcessUnsubscribeGroupMessage(QueuePointer &resp_queue,
player_msghdr * hdr, void * data)
+{
+ if (!CheckHeader(hdr))
+ return -1;
+
+ // Remove the device from the group listeners map
+ player_blackboard_entry_t *request =
reinterpret_cast<player_blackboard_entry_t*>(data);
+ UnsubscribeGroup(request->group, resp_queue);
+
+ // Send back an empty ack
+ this->Publish(
+ this->device_addr,
+ resp_queue,
+ PLAYER_MSGTYPE_RESP_ACK,
+ PLAYER_BLACKBOARD_REQ_UNSUBSCRIBE_FROM_KEY,
+ NULL,
+ 0,
+ NULL);
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
// Set an entry and send out update events to all listeners.
int LocalBB::ProcessSetEntryMessage(QueuePointer &resp_queue, player_msghdr *
hdr, void * data)
{
@@ -399,7 +523,7 @@
SetEntry(entry);
- // Send out update events to other listening devices
+ // Send out update events to other listening devices for key group
combinations
vector<QueuePointer> &devices = listeners[entry.group][entry.key];
for (vector<QueuePointer>::iterator itr=devices.begin(); itr !=
devices.end(); itr++)
@@ -414,6 +538,20 @@
NULL);
}
+ // Send out update events to just groups
+ vector<QueuePointer> &devices_groups = group_listeners[entry.group];
+ for (vector<QueuePointer>::iterator itr=devices_groups.begin(); itr !=
devices_groups.end(); itr++)
+ {
+ QueuePointer device_queue = (*itr);
+ this->Publish(this->device_addr,
+ device_queue,
+ PLAYER_MSGTYPE_DATA,
+ PLAYER_BLACKBOARD_DATA_UPDATE,
+ data,
+ hdr->size,
+ NULL);
+ }
+
// Send back an empty ack
this->Publish(this->device_addr,
resp_queue,
@@ -457,6 +595,39 @@
}
////////////////////////////////////////////////////////////////////////////////
+// Add a device to the group listener map. Return vector of entries for that
group.
+vector<BlackBoardEntry> LocalBB::SubscribeGroup(const string &group, const
QueuePointer &qp)
+{
+ group_listeners[group].push_back(qp);
+ vector<BlackBoardEntry> group_entries;
+
+ // Add all entries for a group to the group_entries vector
+ //map<group, map<key, entry> >
+ map<string, BlackBoardEntry> entry_map = entries[group];
+ for (map<string, BlackBoardEntry>::iterator itr = entry_map.begin();
itr != entry_map.end(); itr++)
+ {
+ group_entries.push_back((*itr).second);
+ }
+ return group_entries;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Remove a device from the group listener map
+void LocalBB::UnsubscribeGroup(const string &group, const QueuePointer &qp)
+{
+ vector<QueuePointer> &devices = group_listeners[group];
+
+ for (vector<QueuePointer>::iterator itr = devices.begin(); itr !=
devices.end(); itr++)
+ {
+ if ((*itr) == qp)
+ {
+ devices.erase(itr);
+ break;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
// Set entry value in the entries map.
void LocalBB::SetEntry(const BlackBoardEntry &entry)
{
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
-------------------------------------------------------------------------
This SF.net email is sponsored by the 2008 JavaOne(SM) Conference
Don't miss this year's exciting event. There's still time to save $100.
Use priority code J8TL2D2.
http://ad.doubleclick.net/clk;198757673;13503038;p?http://java.sun.com/javaone
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit