Author: dlee
Date: Wed Aug  7 15:53:51 2013
New Revision: 396360

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396360
Log:
The pieces are scattered all over the floor.

Modified:
    team/dlee/ASTERISK-21969/res/res_stasis.c
    team/dlee/ASTERISK-21969/res/stasis/app.c
    team/dlee/ASTERISK-21969/res/stasis/app.h

Modified: team/dlee/ASTERISK-21969/res/res_stasis.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/res_stasis.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/res_stasis.c (original)
+++ team/dlee/ASTERISK-21969/res/res_stasis.c Wed Aug  7 15:53:51 2013
@@ -82,6 +82,12 @@
 #define CONTROLS_NUM_BUCKETS 127
 
 /*!
+ * \brief Number of buckets for the Stasis bridges hash table.  Remember to
+ * keep it a prime number!
+ */
+#define BRIDGES_NUM_BUCKETS 127
+
+/*!
  * \brief Stasis application container.
  */
 struct ao2_container *apps_registry;
@@ -89,12 +95,6 @@
 struct ao2_container *app_controls;
 
 struct ao2_container *app_bridges;
-
-/*! \brief Message router for the channel caching topic */
-struct stasis_message_router *channel_router;
-
-/*! \brief Message router for the bridge caching topic */
-struct stasis_message_router *bridge_router;
 
 /*! AO2 hash function for \ref app */
 static int app_hash(const void *obj, const int flags)
@@ -202,36 +202,6 @@
 
 /*! \brief Typedef for blob handler callbacks */
 typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *);
-
-/*! \brief Callback to check whether an app is watching a given channel */
-static int app_watching_channel_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       char *uniqueid = arg;
-
-       return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified 
channel */
-static struct ao2_container *get_apps_watching_channel(const char *uniqueid)
-{
-       struct ao2_container *watching_apps;
-       char *uniqueid_dup;
-       RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, 
ao2_iterator_destroy);
-       ast_assert(uniqueid != NULL);
-
-       uniqueid_dup = ast_strdupa(uniqueid);
-
-       watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, 
app_watching_channel_cb, uniqueid_dup);
-       watching_apps = watching_apps_iter->c;
-
-       if (!ao2_container_count(watching_apps)) {
-               return NULL;
-       }
-
-       ao2_ref(watching_apps, +1);
-       return watching_apps_iter->c;
-}
 
 /*! \brief Typedef for callbacks that get called on channel snapshot updates */
 typedef struct ast_json *(*channel_snapshot_monitor)(
@@ -354,75 +324,6 @@
        channel_dialplan,
        channel_callerid
 };
-
-static int app_send_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       struct ast_json *msg = arg;
-
-       app_send(app, msg);
-       return 0;
-}
-
-static void sub_channel_snapshot_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct stasis_cache_update *update = stasis_message_data(message);
-       struct ast_channel_snapshot *new_snapshot = 
stasis_message_data(update->new_snapshot);
-       struct ast_channel_snapshot *old_snapshot = 
stasis_message_data(update->old_snapshot);
-       /* Pull timestamp from the new snapshot, or from the update message
-        * when there isn't one. */
-       const struct timeval *tv = update->new_snapshot ? 
stasis_message_timestamp(update->new_snapshot) : 
stasis_message_timestamp(message);
-       int i;
-
-       watching_apps = get_apps_watching_channel(new_snapshot ? 
new_snapshot->uniqueid : old_snapshot->uniqueid);
-       if (!watching_apps) {
-               return;
-       }
-
-       for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
-               RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-               msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
-               if (msg) {
-                       ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, 
msg);
-               }
-       }
-}
-
-static void distribute_message(struct ao2_container *apps, struct ast_json 
*msg)
-{
-       ao2_callback(apps, OBJ_NODATA, app_send_cb, msg);
-}
-
-static void sub_channel_blob_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct ast_channel_blob *obj = stasis_message_data(message);
-
-       if (!obj->snapshot) {
-               return;
-       }
-
-       msg = stasis_message_to_json(message);
-       if (!msg) {
-               return;
-       }
-
-       watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid);
-       if (!watching_apps) {
-               return;
-       }
-
-       distribute_message(watching_apps, msg);
-}
 
 /*!
  * \brief In addition to running ao2_cleanup(), this function also removes the
@@ -471,7 +372,7 @@
        ast_bridge_destroy(bridge);
 }
 
-int app_send_start_msg(struct app *app, struct ast_channel *chan,
+static int send_start_msg(struct app *app, struct ast_channel *chan,
        int argc, char *argv[])
 {
        RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
@@ -512,7 +413,7 @@
        return 0;
 }
 
-int app_send_end_msg(struct app *app, struct ast_channel *chan)
+static int send_end_msg(struct app *app, struct ast_channel *chan)
 {
        RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
        RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
@@ -554,6 +455,7 @@
 
        RAII_VAR(struct app *, app, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
        int res = 0;
 
        ast_assert(chan != NULL);
@@ -577,15 +479,17 @@
        }
        ao2_link(app_controls, control);
 
-       res = app_send_start_msg(app, chan, argc, argv);
+       res = send_start_msg(app, chan, argc, argv);
        if (res != 0) {
                ast_log(LOG_ERROR,
                        "Error sending start message to '%s'\n", app_name);
-               return res;
-       }
-
-       if (app_add_channel(app, chan)) {
-               ast_log(LOG_ERROR, "Error adding listener for channel %s to app 
%s\n", ast_channel_name(chan), app_name);
+               return -1;
+       }
+
+       forwards = app_subscribe_channel(app, chan);
+       if (!forwards) {
+               ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
+                       app_name, ast_channel_name(chan));
                return -1;
        }
 
@@ -637,8 +541,7 @@
                }
        }
 
-       app_remove_channel(app, chan);
-       res = app_send_end_msg(app, chan);
+       res = send_end_msg(app, chan);
        if (res != 0) {
                ast_log(LOG_ERROR,
                        "Error sending end message to %s\n", app_name);
@@ -749,296 +652,28 @@
        ast_module_unref(ast_module_info->self);
 }
 
-/*! \brief Callback to check whether an app is watching a given bridge */
-static int app_watching_bridge_cb(void *obj, void *arg, int flags)
-{
-       struct app *app = obj;
-       char *uniqueid = arg;
-
-       return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0;
-}
-
-/*! \brief Get a container full of apps that are interested in the specified 
bridge */
-static struct ao2_container *get_apps_watching_bridge(const char *uniqueid)
-{
-       struct ao2_container *watching_apps;
-       char *uniqueid_dup;
-       RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, 
ao2_iterator_destroy);
-       ast_assert(uniqueid != NULL);
-
-       uniqueid_dup = ast_strdupa(uniqueid);
-
-       watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, 
app_watching_bridge_cb, uniqueid_dup);
-       watching_apps = watching_apps_iter->c;
-
-       if (!ao2_container_count(watching_apps)) {
-               return NULL;
-       }
-
-       ao2_ref(watching_apps, +1);
-       return watching_apps_iter->c;
-}
-
-/*! Callback used to remove an app's interest in a bridge */
-static int remove_bridge_cb(void *obj, void *arg, int flags)
-{
-       app_remove_bridge(obj, arg);
-       return 0;
-}
-
-static struct ast_json *simple_bridge_event(
-       const char *type,
-       struct ast_bridge_snapshot *snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: o}",
-               "type", type,
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(snapshot));
-}
-
-static struct ast_json *simple_bridge_channel_event(
-       const char *type,
-       struct ast_bridge_snapshot *bridge_snapshot,
-       struct ast_channel_snapshot *channel_snapshot,
-       const struct timeval *tv)
-{
-       return ast_json_pack("{s: s, s: o, s: o, s: o}",
-               "type", type,
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(bridge_snapshot),
-               "channel", ast_channel_snapshot_to_json(channel_snapshot));
-}
-
-static void sub_bridge_snapshot_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup);
-       struct stasis_cache_update *update = stasis_message_data(message);
-       struct ast_bridge_snapshot *new_snapshot = 
stasis_message_data(update->new_snapshot);
-       struct ast_bridge_snapshot *old_snapshot = 
stasis_message_data(update->old_snapshot);
-       const struct timeval *tv = update->new_snapshot ? 
stasis_message_timestamp(update->new_snapshot) : 
stasis_message_timestamp(message);
-
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps = get_apps_watching_bridge(new_snapshot ? 
new_snapshot->uniqueid : old_snapshot->uniqueid);
-       if (!watching_apps || !ao2_container_count(watching_apps)) {
-               return;
-       }
-
-       if (!new_snapshot) {
-               RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), 
ast_free);
-
-               /* The bridge has gone away. Create the message, make sure no 
apps are
-                * watching this bridge anymore, and destroy the bridge's 
control
-                * structure */
-               msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
-               ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, 
bridge_id);
-               stasis_app_bridge_destroy(old_snapshot->uniqueid);
-       } else if (!old_snapshot) {
-               msg = simple_bridge_event("BridgeCreated", old_snapshot, tv);
-       }
-
-       if (!msg) {
-               return;
-       }
-
-       distribute_message(watching_apps, msg);
-}
-
-/*! \brief Callback used to merge two containers of applications */
-static int list_merge_cb(void *obj, void *arg, int flags)
-{
-       /* remove any current entries for this app */
-       ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | 
OBJ_MULTIPLE);
-       /* relink as the only entry */
-       ao2_link(arg, obj);
-       return 0;
-}
-
-/*! \brief Merge container src into container dst without modifying src */
-static void update_apps_list(struct ao2_container *dst, struct ao2_container 
*src)
-{
-       ao2_callback(src, OBJ_NODATA, list_merge_cb, dst);
-}
-
-/*! \brief Callback for adding to an app's bridges of interest */
-static int app_add_bridge_cb(void *obj, void *arg, int flags)
-{
-       app_add_bridge(obj, arg);
-       return 0;
-}
-
-/*! \brief Add interest in the given bridge to all apps in the container */
-static void update_bridge_interest(struct ao2_container *apps, const char 
*bridge_id)
-{
-       RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free);
-       ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup);
-}
-
-static void sub_bridge_merge_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_all, 
ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-       struct ast_bridge_merge_message *merge = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-       RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
-       const struct timeval *tv = stasis_message_timestamp(message);
-
-       watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid);
-       if (watching_apps_to) {
-               update_apps_list(watching_apps_all, watching_apps_to);
-       }
-
-       watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid);
-       if (watching_apps_from) {
-               update_bridge_interest(watching_apps_from, merge->to->uniqueid);
-               update_apps_list(watching_apps_all, watching_apps_from);
-       }
-
-       if (!ao2_container_count(watching_apps_all)) {
-               return;
-       }
-
-       msg = ast_json_pack("{s: s, s: o, s: o, s: o}",
-               "type", "BridgeMerged",
-               "timestamp", ast_json_timeval(*tv, NULL),
-               "bridge", ast_bridge_snapshot_to_json(merge->to),
-               "bridge_from", ast_bridge_snapshot_to_json(merge->from));
-
-       if (!msg) {
-               return;
-       }
-
-       distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_enter_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, 
ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, 
ao2_cleanup);
-       RAII_VAR(struct ao2_container *, watching_apps_all, 
ao2_container_alloc(1, NULL, NULL), ao2_cleanup);
-       struct ast_bridge_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-       if (watching_apps_bridge) {
-               update_apps_list(watching_apps_all, watching_apps_bridge);
-       }
-
-       watching_apps_channel = 
get_apps_watching_channel(obj->channel->uniqueid);
-       if (watching_apps_channel) {
-               update_bridge_interest(watching_apps_channel, 
obj->bridge->uniqueid);
-               update_apps_list(watching_apps_all, watching_apps_channel);
-       }
-
-       if (!ao2_container_count(watching_apps_all)) {
-               return;
-       }
-
-       msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge,
-               obj->channel, stasis_message_timestamp(message));
-
-       distribute_message(watching_apps_all, msg);
-}
-
-static void sub_bridge_leave_handler(void *data,
-               struct stasis_subscription *sub,
-               struct stasis_topic *topic,
-               struct stasis_message *message)
-{
-       RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, 
ao2_cleanup);
-       struct ast_bridge_blob *obj = stasis_message_data(message);
-       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
-
-       watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid);
-       if (!watching_apps_bridge) {
-               return;
-       }
-
-       msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge,
-               obj->channel, stasis_message_timestamp(message));
-
-       distribute_message(watching_apps_bridge, msg);
-}
-
 static int load_module(void)
 {
-       int r = 0;
-
-       apps_registry =
-               ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
+       apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash,
+               app_compare);
        if (apps_registry == NULL) {
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-                                            control_hash, control_compare);
+       app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash,
+               control_compare);
        if (app_controls == NULL) {
                return AST_MODULE_LOAD_FAILURE;
        }
 
-       app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
-                                            bridges_hash, bridges_compare);
-       if (app_bridges == NULL) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       channel_router = 
stasis_message_router_create(ast_channel_topic_all_cached());
-       if (!channel_router) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       r |= stasis_message_router_add(channel_router, 
stasis_cache_update_type(), sub_channel_snapshot_handler, NULL);
-       /* TODO: This could be handled a lot better. Instead of subscribing to
-        * the one caching topic and filtering out messages by channel id, we
-        * should have individual caching topics per-channel, with a shared
-        * back-end cache. That would simplify a lot of what's going on right
-        * here.
-        */
-       r |= stasis_message_router_add(channel_router, 
ast_channel_user_event_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, 
ast_channel_varset_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, 
ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL);
-       r |= stasis_message_router_add(channel_router, 
ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL);
-       if (r) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       bridge_router = 
stasis_message_router_create(ast_bridge_topic_all_cached());
-       if (!bridge_router) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
-
-       r |= stasis_message_router_add(bridge_router, 
stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, 
ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, 
ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL);
-       r |= stasis_message_router_add(bridge_router, 
ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL);
-       if (r) {
-               return AST_MODULE_LOAD_FAILURE;
-       }
+        app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash,
+               bridges_compare);
 
        return AST_MODULE_LOAD_SUCCESS;
 }
 
 static int unload_module(void)
 {
-       int r = 0;
-
-       stasis_message_router_unsubscribe_and_join(channel_router);
-       channel_router = NULL;
-
-       stasis_message_router_unsubscribe_and_join(bridge_router);
-       bridge_router = NULL;
-
        ao2_cleanup(apps_registry);
        apps_registry = NULL;
 
@@ -1048,7 +683,7 @@
        ao2_cleanup(app_bridges);
        app_bridges = NULL;
 
-       return r;
+       return 0;
 }
 
 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis 
application support",

Modified: team/dlee/ASTERISK-21969/res/stasis/app.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.c?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.c (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.c Wed Aug  7 15:53:51 2013
@@ -30,49 +30,65 @@
 #include "app.h"
 
 #include "asterisk/stasis_app.h"
+#include "asterisk/stasis_bridges.h"
 #include "asterisk/stasis_channels.h"
-
-/*!
- * \brief Number of buckets for the channels container for app instances.  
Remember
- * to keep it a prime number!
- */
-#define APP_CHANNELS_BUCKETS 7
-
-/*!
- * \brief Number of buckets for the bridges container for app instances.  
Remember
- * to keep it a prime number!
- */
-#define APP_BRIDGES_BUCKETS 7
+#include "asterisk/stasis_message_router.h"
 
 struct app {
+       /*! Atomic integer counting channels in this app */
+       int channel_count;
+       /*! Aggregation topic for this application. */
+       struct stasis_topic *topic;
+       /*! Router for handling messages forwarded to \a topic. */
+       struct stasis_message_router *router;
        /*! Callback function for this application. */
        stasis_app_cb handler;
        /*! Opaque data to hand to callback function. */
        void *data;
-       /*! List of channel identifiers this app instance is interested in */
-       struct ao2_container *channels;
-       /*! List of bridge identifiers this app instance owns */
-       struct ao2_container *bridges;
        /*! Name of the Stasis application */
        char name[];
 };
 
+struct app_forwards {
+       struct app *app;
+
+       struct stasis_subscription *channel_forward;
+       struct stasis_subscription *channel_cached_forward;
+
+       struct stasis_subscription *bridge_forward;
+       struct stasis_subscription *bridge_cached_forward;
+};
+
 static void app_dtor(void *obj)
 {
        struct app *app = obj;
+
+       ao2_cleanup(app->topic);
+       app->topic = NULL;
 
        ao2_cleanup(app->data);
        app->data = NULL;
-       ao2_cleanup(app->channels);
-       app->channels = NULL;
-       ao2_cleanup(app->bridges);
-       app->bridges = NULL;
+}
+
+static void sub_default_handler(void *data, struct stasis_subscription *sub,
+       struct stasis_topic *topic, struct stasis_message *message)
+{
+       struct app *app = data;
+       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+
+       json = stasis_message_to_json(message);
+       if (!json) {
+               return;
+       }
+
+       app_send(app, json);
 }
 
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
 {
        RAII_VAR(struct app *, app, NULL, ao2_cleanup);
        size_t size;
+       int res = 0;
 
        ast_assert(name != NULL);
        ast_assert(handler != NULL);
@@ -85,76 +101,53 @@
        if (!app) {
                return NULL;
        }
+
+       app->topic = stasis_topic_create(name);
+       if (!app->topic) {
+               return NULL;
+       }
+
+       app->router = stasis_message_router_create(app->topic);
+       if (!app->router) {
+               return NULL;
+       }
+
+       /*
+       res |= stasis_message_router_add(app->router,
+               ast_channel_user_event_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+               ast_channel_varset_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+               ast_channel_dtmf_end_type(), sub_channel_blob_handler, app);
+        res |= stasis_message_router_add(app->router,
+               ast_channel_hangup_request_type(), sub_channel_blob_handler,
+               app);
+        res |= stasis_message_router_add(app->router,
+               stasis_cache_update_type(), sub_bridge_snapshot_handler, app);
+        res |= stasis_message_router_add(app->router,
+               ast_bridge_merge_message_type(), sub_bridge_merge_handler,
+               app);
+        res |= stasis_message_router_add(app->router,
+               ast_channel_entered_bridge_type(), sub_bridge_enter_handler,
+               app);
+        res |= stasis_message_router_add(app->router,
+               ast_channel_left_bridge_type(), sub_bridge_leave_handler, app);
+       */
+       res |= stasis_message_router_set_default(app->router,
+               sub_default_handler, app);
+
+       if (res != 0) {
+               return NULL;
+       }
+
 
        strncpy(app->name, name, size - sizeof(*app));
        app->handler = handler;
        ao2_ref(data, +1);
        app->data = data;
 
-       app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS);
-       if (!app->channels) {
-               return NULL;
-       }
-
-       app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS);
-       if (!app->bridges) {
-               return NULL;
-       }
-
        ao2_ref(app, +1);
        return app;
-}
-
-int app_add_channel(struct app *app, const struct ast_channel *chan)
-{
-       SCOPED_AO2LOCK(lock, app);
-       const char *uniqueid;
-
-       ast_assert(app != NULL);
-       ast_assert(chan != NULL);
-
-       /* Don't accept new channels in an inactive application */
-       if (!app->handler) {
-               return -1;
-       }
-
-       uniqueid = ast_channel_uniqueid(chan);
-       return ast_str_container_add(app->channels, uniqueid) ? -1 : 0;
-}
-
-void app_remove_channel(struct app* app, const struct ast_channel *chan)
-{
-       SCOPED_AO2LOCK(lock, app);
-
-       ast_assert(app != NULL);
-       ast_assert(chan != NULL);
-
-       ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | 
OBJ_NODATA | OBJ_UNLINK);
-}
-
-int app_add_bridge(struct app *app, const char *uniqueid)
-{
-       SCOPED_AO2LOCK(lock, app);
-
-       ast_assert(app != NULL);
-       ast_assert(uniqueid != NULL);
-
-       /* Don't accept new bridges in an inactive application */
-       if (!app->handler) {
-               return -1;
-       }
-
-       return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0;
-}
-
-void app_remove_bridge(struct app* app, const char *uniqueid)
-{
-       SCOPED_AO2LOCK(lock, app);
-
-       ast_assert(app != NULL);
-       ast_assert(uniqueid != NULL);
-
-       ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | 
OBJ_MULTIPLE);
 }
 
 /*!
@@ -207,7 +200,7 @@
        SCOPED_AO2LOCK(lock, app);
 
        return app->handler == NULL &&
-               ao2_container_count(app->channels) == 0;
+               ast_atomic_fetchadd_int(&app->channel_count, 0) == 0;
 }
 
 void app_update(struct app *app, stasis_app_cb handler, void *data)
@@ -243,16 +236,96 @@
        return app->name;
 }
 
-int app_is_watching_channel(struct app *app, const char *uniqueid)
-{
-       RAII_VAR(char *, found, NULL, ao2_cleanup);
-       found = ao2_find(app->channels, uniqueid, OBJ_KEY);
-       return found != NULL;
-}
-
-int app_is_watching_bridge(struct app *app, const char *uniqueid)
-{
-       RAII_VAR(char *, found, NULL, ao2_cleanup);
-       found = ao2_find(app->bridges, uniqueid, OBJ_KEY);
-       return found != NULL;
-}
+static void forwards_dtor(void *obj)
+{
+       struct app_forwards *forwards = obj;
+
+       stasis_unsubscribe(forwards->channel_forward);
+       forwards->channel_forward = NULL;
+       stasis_unsubscribe(forwards->channel_cached_forward);
+       forwards->channel_cached_forward = NULL;
+       stasis_unsubscribe(forwards->bridge_forward);
+       forwards->bridge_forward = NULL;
+       stasis_unsubscribe(forwards->bridge_cached_forward);
+       forwards->bridge_cached_forward = NULL;
+
+       ast_atomic_fetchadd_int(&forwards->app->channel_count, -1);
+
+       ao2_cleanup(forwards->app);
+       forwards->app = NULL;
+}
+
+struct app_forwards *app_subscribe_channel(struct app *app,
+       struct ast_channel *chan)
+{
+       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
+
+       if (!app || !chan) {
+               return NULL;
+       }
+
+       forwards = ao2_alloc(sizeof(*forwards), forwards_dtor);
+       if (!forwards) {
+               return NULL;
+       }
+
+       ao2_ref(app, +1);
+       forwards->app = app;
+
+       forwards->channel_forward = stasis_forward_all(ast_channel_topic(chan),
+               app->topic);
+       if (!forwards->channel_forward) {
+               return NULL;
+       }
+
+       forwards->channel_cached_forward = stasis_forward_all(
+               ast_channel_topic_cached(chan), app->topic);
+       if (!forwards->channel_cached_forward) {
+               return NULL;
+       }
+
+       ast_atomic_fetchadd_int(&forwards->app->channel_count, +1);
+       ao2_ref(forwards, +1);
+       return forwards;
+}
+
+int app_subscribe_bridge(struct app_forwards *forwards,
+       struct ast_bridge *bridge)
+{
+       if (!forwards || !bridge) {
+               return -1;
+       }
+
+       ast_assert(forwards->app != NULL);
+
+       app_unsubscribe_bridge(forwards);
+
+       forwards->bridge_forward = stasis_forward_all(
+               ast_bridge_topic(bridge), forwards->app->topic);
+       if (!forwards->bridge_forward) {
+               return -1;
+       }
+
+       forwards->bridge_cached_forward = stasis_forward_all(
+               ast_bridge_topic_cached(bridge), forwards->app->topic);
+       if (!forwards->bridge_cached_forward) {
+               /* Probably a bad idea to stay half-subscribed */
+               stasis_unsubscribe(forwards->bridge_forward);
+               forwards->bridge_forward = NULL;
+               return -1;
+       }
+
+       return 0;
+}
+
+void app_unsubscribe_bridge(struct app_forwards *forwards)
+{
+       if (!forwards) {
+               return;
+       }
+
+       stasis_unsubscribe(forwards->bridge_forward);
+       forwards->bridge_forward = NULL;
+       stasis_unsubscribe(forwards->bridge_cached_forward);
+       forwards->bridge_cached_forward = NULL;
+}

Modified: team/dlee/ASTERISK-21969/res/stasis/app.h
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/ASTERISK-21969/res/stasis/app.h?view=diff&rev=396360&r1=396359&r2=396360
==============================================================================
--- team/dlee/ASTERISK-21969/res/stasis/app.h (original)
+++ team/dlee/ASTERISK-21969/res/stasis/app.h Wed Aug  7 15:53:51 2013
@@ -96,17 +96,6 @@
 const char *app_name(const struct app *app);
 
 /*!
- * \brief Subscribe an application to a topic.
- *
- * \param app Application.
- * \param topic Topic to subscribe to.
- * \return New subscription.
- * \return \c NULL on error.
- */
-struct stasis_subscription *app_subscribe(struct app *app,
-       struct stasis_topic *topic);
-
-/*!
  * \brief Send a message to an application.
  *
  * \param app Application.
@@ -114,83 +103,41 @@
  */
 void app_send(struct app *app, struct ast_json *message);
 
+struct app_forwards;
+
 /*!
- * \brief Send the start message to an application.
+ * \brief Subscribes an application to a channel.
+ *
+ * The returned object is AO2 managed, and should be ao2_cleanup()'ed to kill
+ * the subscriptions.
  *
  * \param app Application.
- * \param chan The channel entering the application.
- * \param argc The number of arguments for the application.
- * \param argv The arguments for the application.
+ * \param chan Channel to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc,
-       char *argv[]);
+struct app_forwards *app_subscribe_channel(struct app *app,
+       struct ast_channel *chan);
 
 /*!
- * \brief Send the end message to an application.
+ * \brief Add a bridge subscription to an existing channel subscription.
  *
- * \param app Application.
- * \param chan The channel leaving the application.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_send_end_msg(struct app *app, struct ast_channel *chan);
+int app_subscribe_bridge(struct app_forwards *forwards,
+       struct ast_bridge *bridge);
 
 /*!
- * \brief Checks if an application is watching a given channel.
+ * \brief Cancel the bridge subscription for an application.
  *
- * \param app Application.
- * \param uniqueid Uniqueid of the channel to check about.
- * \return True (non-zero) if \a app is watching channel with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_channel(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Add a channel to an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
+ * \param forwards Return from app_subscribe_channel().
+ * \param bridge Bridge to subscribe to.
  * \return 0 on success.
  * \return Non-zero on error.
  */
-int app_add_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Remove a channel from an application's watch list.
- *
- * \param app Application.
- * \param chan Channel to watch.
- */
-void app_remove_channel(struct app *app, const struct ast_channel *chan);
-
-/*!
- * \brief Add a bridge to an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to watch.
- * \return 0 on success.
- * \return Non-zero on error.
- */
-int app_add_bridge(struct app *app, const char *uniqueid);
-
-/*!
- * \brief Remove a bridge from an application's watch list by uniqueid.
- *
- * \param app Application.
- * \param bridge Bridge to remove.
- */
-void app_remove_bridge(struct app* app, const char *uniqueid);
-
-/*!
- * \brief Checks if an application is watching a given bridge.
- *
- * \param app Application.
- * \param uniqueid Uniqueid of the bridge to check.
- * \return True (non-zero) if \a app is watching bridge with given \a uniqueid
- * \return False (zero) if \a app isn't.
- */
-int app_is_watching_bridge(struct app *app, const char *uniqueid);
+void app_unsubscribe_bridge(struct app_forwards *forwards);
 
 #endif /* _ASTERISK_RES_STASIS_APP_H */


--
_____________________________________________________________________
-- Bandwidth and Colocation Provided by http://www.api-digital.com --

svn-commits mailing list
To UNSUBSCRIBE or update options visit:
   http://lists.digium.com/mailman/listinfo/svn-commits

Reply via email to