Author: dlee
Date: Thu Aug  8 12:46:30 2013
New Revision: 396398

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396398
Log:
Bridge enter/leave events

Modified:
    team/dlee/ASTERISK-21969/main/stasis_bridges.c
    team/dlee/ASTERISK-21969/res/res_stasis.c
    team/dlee/ASTERISK-21969/res/stasis/app.c
    team/dlee/ASTERISK-21969/res/stasis/app.h

Modified: team/dlee/ASTERISK-21969/main/stasis_bridges.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/main/stasis_bridges.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/main/stasis_bridges.c (original)
+++ team/dlee/ASTERISK-21969/main/stasis_bridges.c Thu Aug  8 12:46:30 2013
@@ -132,6 +132,8 @@
 
 static struct ast_manager_event_blob *attended_transfer_to_ami(struct 
stasis_message *message);
 static struct ast_manager_event_blob *blind_transfer_to_ami(struct 
stasis_message *message);
+static struct ast_json *ast_channel_entered_bridge_to_json(struct 
stasis_message *msg);
+static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message 
*msg);
 
 static struct stasis_cp_all *bridge_cache_all;
 
@@ -140,8 +142,10 @@
  */
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
-STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type,
+       .to_json = ast_channel_entered_bridge_to_json);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type,
+       .to_json = ast_channel_left_bridge_to_json);
 STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = 
blind_transfer_to_ami);
 STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = 
attended_transfer_to_ami);
 /*! @} */
@@ -415,6 +419,35 @@
        /* state first, then leave blob (opposite of enter, preserves nesting 
of events) */
        bridge_publish_state_from_blob(bridge, stasis_message_data(msg));
        stasis_publish(ast_bridge_topic(bridge), msg);
+}
+
+static struct ast_json *simple_bridge_channel_event(
+        const char *type,
+        struct ast_bridge_snapshot *bridge_snapshot,
+        struct ast_channel_snapshot *channel_snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
+                "channel", ast_channel_snapshot_to_json(channel_snapshot));
+}
+
+struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+       return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
+}
+
+struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg)
+{
+        struct ast_bridge_blob *obj = stasis_message_data(msg);
+
+       return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
+                obj->channel, stasis_message_timestamp(msg));
 }
 
 typedef struct ast_json *(*json_item_serializer_cb)(void *obj);

Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Thu Aug  8 12:46:30 2013
@@ -455,7 +455,6 @@
 
        RAII_VAR(struct app *, app, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
-       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
        int res = 0;
 
        ast_assert(chan != NULL);
@@ -486,9 +485,9 @@
                return -1;
        }
 
-       forwards = app_subscribe_channel(app, chan);
-       if (!forwards) {
-               ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+       res = app_subscribe_channel(app, chan);
+       if (res != 0) {
+               ast_log(LOG_ERROR, "Error subscribing app '%s' to channel 
'%s'\n",
                        app_name, ast_channel_name(chan));
                return -1;
        }
@@ -497,8 +496,18 @@
                RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
                int r;
                int command_count;
-
-               if (stasis_app_get_bridge(control)) {
+               struct ast_bridge *last_bridge = NULL;
+               struct ast_bridge *bridge = NULL;
+
+               last_bridge = bridge;
+               bridge = stasis_app_get_bridge(control);
+
+               if (bridge != last_bridge) {
+                       app_unsubscribe_bridge(app, last_bridge);
+                       app_subscribe_bridge(app, bridge);
+               }
+
+               if (bridge) {
                        /* Bridge is handling channel frames */
                        control_wait(control);
                        control_dispatch_all(control, chan);
@@ -548,6 +557,8 @@
                }
        }
 
+       app_unsubscribe_channel(app, chan);
+
        res = send_end_msg(app, chan);
        if (res != 0) {
                ast_log(LOG_ERROR,

Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Thu Aug  8 12:46:30 2013
@@ -35,12 +35,12 @@
 #include "asterisk/stasis_message_router.h"
 
 struct app {
-       /*! Atomic integer counting channels in this app */
-       int channel_count;
        /*! Aggregation topic for this application. */
        struct stasis_topic *topic;
        /*! Router for handling messages forwarded to \a topic. */
        struct stasis_message_router *router;
+       /*! Container of the channel forwards to this app's topic. */
+       struct ao2_container *forwards;
        /*! Callback function for this application. */
        stasis_app_cb handler;
        /*! Opaque data to hand to callback function. */
@@ -50,14 +50,149 @@
 };
 
 struct app_forwards {
-       struct app *app;
-
-       struct stasis_subscription *channel_forward;
-       struct stasis_subscription *channel_cached_forward;
-
-       struct stasis_subscription *bridge_forward;
-       struct stasis_subscription *bridge_cached_forward;
+       /*! Count of number of times this channel/bridge has been subscribed */
+       int interested;
+
+       /*! Forward for the regular topic */
+       struct stasis_subscription *topic_forward;
+       /*! Forward for the caching topic */
+       struct stasis_subscription *topic_cached_forward;
+
+       /*! Unique id of the object being forwarded */
+       char id[];
 };
+
+static void forwards_dtor(void *obj)
+{
+       struct app_forwards *forwards = obj;
+
+       ast_assert(forwards->topic_forward == NULL);
+       ast_assert(forwards->topic_cached_forward == NULL);
+}
+
+static void forwards_unsubscribe(struct app_forwards *forwards)
+{
+       stasis_unsubscribe(forwards->topic_forward);
+       forwards->topic_forward = NULL;
+       stasis_unsubscribe(forwards->topic_cached_forward);
+       forwards->topic_cached_forward = NULL;
+}
+
+static struct app_forwards *forwards_create(struct app *app,
+       const char *id)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !id) {
+               return NULL;
+       }
+
+       forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
+       if (!forwards) {
+               return NULL;
+       }
+
+       strcpy(forwards->id, id);
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+static struct app_forwards *forwards_create_channel(struct app *app,
+       struct ast_channel *chan)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !chan) {
+               return NULL;
+       }
+
+       forwards = forwards_create(app, ast_channel_uniqueid(chan));
+       if (!forwards) {
+               return NULL;
+       }
+
+       forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
+               app->topic);
+       if (!forwards->topic_forward) {
+               return NULL;
+       }
+
+       forwards->topic_cached_forward = stasis_forward_all(
+               ast_channel_topic_cached(chan), app->topic);
+       if (!forwards->topic_cached_forward) {
+               /* Half-subscribed is a bad thing */
+               stasis_unsubscribe(forwards->topic_forward);
+               forwards->topic_forward = NULL;
+               return NULL;
+       }
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+static struct app_forwards *forwards_create_bridge(struct app *app,
+       struct ast_bridge *bridge)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !bridge) {
+               return NULL;
+       }
+
+       forwards = forwards_create(app, bridge->uniqueid);
+       if (!forwards) {
+               return NULL;
+       }
+
+       forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
+               app->topic);
+       if (!forwards->topic_forward) {
+               return NULL;
+       }
+
+       forwards->topic_cached_forward = stasis_forward_all(
+               ast_bridge_topic_cached(bridge), app->topic);
+       if (!forwards->topic_cached_forward) {
+               /* Half-subscribed is a bad thing */
+               stasis_unsubscribe(forwards->topic_forward);
+               forwards->topic_forward = NULL;
+               return NULL;
+       }
+
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+static int forwards_sort(const void *obj_left, const void *obj_right, int 
flags)
+{
+    const struct app_forwards *object_left = obj_left;
+    const struct app_forwards *object_right = obj_right;
+    const char *right_key = obj_right;
+    int cmp;
+
+    switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+    case OBJ_POINTER:
+        right_key = object_right->id;
+        /* Fall through */
+    case OBJ_KEY:
+        cmp = strcmp(object_left->id, right_key);
+        break;
+    case OBJ_PARTIAL_KEY:
+        /*
+         * We could also use a partial key struct containing a length
+         * so strlen() does not get called for every comparison instead.
+         */
+        cmp = strncmp(object_left->id, right_key, strlen(right_key));
+        break;
+    default:
+        /* Sort can only work on something with a full or partial key. */
+        ast_assert(0);
+        cmp = 0;
+        break;
+    }
+    return cmp;
+}
 
 static void app_dtor(void *obj)
 {
@@ -84,6 +219,55 @@
        app_send(app, json);
 }
 
+static struct ast_json *simple_bridge_event(
+        const char *type,
+        struct ast_bridge_snapshot *snapshot,
+        const struct timeval *tv)
+{
+        return ast_json_pack("{s: s, s: o, s: o}",
+                "type", type,
+                "timestamp", ast_json_timeval(*tv, NULL),
+                "bridge", ast_bridge_snapshot_to_json(snapshot));
+}
+
+static void sub_bridge_update_handler(void *data,
+                struct stasis_subscription *sub,
+                struct stasis_topic *topic,
+                struct stasis_message *message)
+{
+       struct app *app = data;
+        struct stasis_cache_update *update;
+        struct ast_bridge_snapshot *new_snapshot;
+        struct ast_bridge_snapshot *old_snapshot;
+        const struct timeval *tv;
+        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+       ast_assert(stasis_message_type(message) == stasis_cache_update_type());
+
+       update = stasis_message_data(message);
+
+       ast_assert(update->type == ast_bridge_snapshot_type());
+
+       new_snapshot = stasis_message_data(update->new_snapshot);
+       old_snapshot = stasis_message_data(update->old_snapshot);
+       tv = update->new_snapshot ?
+               stasis_message_timestamp(update->new_snapshot) :
+               stasis_message_timestamp(message);
+
+
+        if (!new_snapshot) {
+                json = simple_bridge_event("BridgeDestroyed", old_snapshot, 
tv);
+        } else if (!old_snapshot) {
+                json = simple_bridge_event("BridgeCreated", old_snapshot, tv);
+        }
+
+        if (!json) {
+                return;
+        }
+
+        app_send(app, json);
+}
+
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
        RAII_VAR(struct app *, app, NULL, ao2_cleanup);
@@ -102,6 +286,10 @@
                return NULL;
        }
 
+       app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+               AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+               forwards_sort, NULL);
+
        app->topic = stasis_topic_create(name);
        if (!app->topic) {
                return NULL;
@@ -112,26 +300,13 @@
                return NULL;
        }
 
+        res |= stasis_message_router_add_cache_update(app->router,
+               ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
+
        /*
-       res |= stasis_message_router_add(app->router,
-               ast_channel_user_event_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-               ast_channel_varset_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-               ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
-        res |= stasis_message_router_add(app->router,
-               ast_channel_hangup_request_type(), sub_channel_blob_handler,
-               app);
-        res |= stasis_message_router_add(app->router,
-               stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
         res |= stasis_message_router_add(app->router,
                ast_bridge_merge_message_type(), sub_bridge_merge_handler,
                app);
-        res |= stasis_message_router_add(app->router,
-               ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
-               app);
-        res |= stasis_message_router_add(app->router,
-               ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
        */
        res |= stasis_message_router_set_default(app->router,
                sub_default_handler, app);
@@ -199,8 +374,7 @@
 {
        SCOPED_AO2LOCK(lock, app);
 
-       return app->handler == NULL &&
-               ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
+       return app->handler == NULL && ao2_container_count(app->forwards) == 0;
 }
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -236,96 +410,93 @@
        return app->name;
 }
 
-static void forwards_dtor(void *obj)
-{
-       struct app_forwards *forwards = obj;
-
-       stasis_unsubscribe(forwards->channel_forward);
-       forwards->channel_forward = NULL;
-       stasis_unsubscribe(forwards->channel_cached_forward);
-       forwards->channel_cached_forward = NULL;
-       stasis_unsubscribe(forwards->bridge_forward);
-       forwards->bridge_forward = NULL;
-       stasis_unsubscribe(forwards->bridge_cached_forward);
-       forwards->bridge_cached_forward = NULL;
-
-       ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
-
-       ao2_cleanup(forwards->app);
-       forwards->app = NULL;
-}
-
-struct app_forwards *app_subscribe_channel(struct app *app,
-       struct ast_channel *chan)
+int app_subscribe_channel(struct app *app, struct ast_channel *chan)
+{
+       if (!app || !chan) {
+               return -1;
+       } else {
+               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+               SCOPED_AO2LOCK(lock, app->forwards);
+
+               forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
+                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+               if (!forwards) {
+                       /* Forwards not found, create one */
+                       forwards = forwards_create_channel(app, chan);
+                       if (!forwards) {
+                               return -1;
+                       }
+                       ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+               }
+
+               ++forwards->interested;
+               return 0;
+       }
+}
+
+static int unsubscribe(struct app *app, const char *kind, const char *id)
 {
        RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
-
+       SCOPED_AO2LOCK(lock, app->forwards);
+
+       forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!forwards) {
+               ast_log(LOG_ERROR,
+                       "App '%s' not subscribed to %s '%s'",
+                       app->name, kind, id);
+               return -1;
+       }
+
+       if (--forwards->interested == 0) {
+               /* No one is interested any more; unsubscribe */
+               forwards_unsubscribe(forwards);
+               ao2_find(app->forwards, forwards,
+                       OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
+                       OBJ_NODATA);
+       }
+
+       return 0;
+}
+
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan)
+{
        if (!app || !chan) {
-               return NULL;
-       }
-
-       forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
-       if (!forwards) {
-               return NULL;
-       }
-
-       ao2_ref(app, +1);
-       forwards->app = app;
-
-       forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
-               app->topic);
-       if (!forwards->channel_forward) {
-               return NULL;
-       }
-
-       forwards->channel_cached_forward = stasis_forward_all(
-               ast_channel_topic_cached(chan), app->topic);
-       if (!forwards->channel_cached_forward) {
-               return NULL;
-       }
-
-       ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
-       ao2_ref(forwards, +1);
-       return forwards;
-}
-
-int app_subscribe_bridge(struct app_forwards *forwards,
-       struct ast_bridge *bridge)
-{
-       if (!forwards || !bridge) {
                return -1;
        }
 
-       ast_assert(forwards->app != NULL);
-
-       app_unsubscribe_bridge(forwards);
-
-       forwards->bridge_forward = stasis_forward_all(
-               ast_bridge_topic(bridge), forwards->app->topic);
-       if (!forwards->bridge_forward) {
+       return unsubscribe(app, "channel", ast_channel_uniqueid(chan));
+}
+
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+       if (!app || !bridge) {
                return -1;
-       }
-
-       forwards->bridge_cached_forward = stasis_forward_all(
-               ast_bridge_topic_cached(bridge), forwards->app->topic);
-       if (!forwards->bridge_cached_forward) {
-               /* Probably a bad idea to stay half-subscribed */
-               stasis_unsubscribe(forwards->bridge_forward);
-               forwards->bridge_forward = NULL;
+       } else {
+               RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+               SCOPED_AO2LOCK(lock, app->forwards);
+
+               forwards = ao2_find(app->forwards, bridge->uniqueid,
+                       OBJ_SEARCH_KEY | OBJ_NOLOCK);
+
+               if (!forwards) {
+                       /* Forwards not found, create one */
+                       forwards = forwards_create_bridge(app, bridge);
+                       if (!forwards) {
+                               return -1;
+                       }
+                       ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
+               }
+
+               ++forwards->interested;
+               return 0;
+       }
+}
+
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge)
+{
+       if (!app || !bridge) {
                return -1;
        }
 
-       return 0;
-}
-
-void app_unsubscribe_bridge(struct app_forwards *forwards)
-{
-       if (!forwards) {
-               return;
-       }
-
-       stasis_unsubscribe(forwards->bridge_forward);
-       forwards->bridge_forward = NULL;
-       stasis_unsubscribe(forwards->bridge_cached_forward);
-       forwards->bridge_cached_forward = NULL;
-}
+       return unsubscribe(app, "bridge", bridge->uniqueid);
+}

Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396398&r1=396397&r2=396398
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Thu Aug  8 12:46:30 2013
@@ -116,19 +116,25 @@
  * \return 0 on success.
  * \return Non-zero on error.
  */
-struct app_forwards *app_subscribe_channel(struct app *app,
-       struct ast_channel *chan);
+int app_subscribe_channel(struct app *app, struct ast_channel *chan);
+
+/*!
+ * \brief Cancel the subscription an app has for a channel.
+ *
+ * \param app Subscribing application.
+ * \param forwards Returned object from app_subscribe_channel().
+ */
+int app_unsubscribe_channel(struct app *app, struct ast_channel *chan);
 
 /*!
  * \brief Add a bridge subscription to an existing channel subscription.
  *
- * \param forwards Return from app_subscribe_channel().
+ * \param app Application.
  * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_subscribe_bridge(struct app_forwards *forwards,
-       struct ast_bridge *bridge);
+int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 /*!
  * \brief Cancel the bridge subscription for an application.
@@ -138,6 +144,6 @@
  * \return 0 on success.
  * \return Non-zero on error.
  */
-void app_unsubscribe_bridge(struct app_forwards *forwards);
+int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge);
 
 #endif /* _ASTERISK_RES_STASIS_APP_H */


--
_____________________________________________________________________
-- Bandwidth and Colocation Provided by http://www.api-digital.com --

svn-commits mailing list
To UNSUBSCRIBE or update options visit:
   http://lists.digium.com/mailman/listinfo/svn-commits

Reply via email to