Author: mjordan
Date: Mon Mar  9 09:13:38 2015
New Revision: 432635

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=432635
Log:
Initial commit

It mostly lives, breathes, and works. At least with PJSIP.

Modified:
    team/mjordan/trunk-astdb-cluster/funcs/func_db.c
    team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
    team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
    team/mjordan/trunk-astdb-cluster/main/db.c
    team/mjordan/trunk-astdb-cluster/main/utils.c
    team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
    team/mjordan/trunk-astdb-cluster/tests/test_db.c

Modified: team/mjordan/trunk-astdb-cluster/funcs/func_db.c
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/funcs/func_db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/funcs/func_db.c (original)
+++ team/mjordan/trunk-astdb-cluster/funcs/func_db.c Mon Mar  9 09:13:38 2015
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 2005-2006, Russell Bryant <russ...@clemson.edu> 
+ * Copyright (C) 2005-2015, Russell Bryant <russ...@clemson.edu> 
  *
  * func_db.c adapted from the old app_db.c, copyright by the following people 
  * Copyright (C) 2005, Mark Spencer <marks...@digium.com>
@@ -23,6 +23,7 @@
  * \brief Functions for interaction with the Asterisk database
  *
  * \author Russell Bryant <russ...@clemson.edu>
+ * \author Matt Jordan <mjor...@digium.com>
  *
  * \ingroup functions
  */
@@ -52,6 +53,30 @@
                <syntax argsep="/">
                        <parameter name="family" required="true" />
                        <parameter name="key" required="true" />
+               </syntax>
+               <description>
+                       <para>This function will read from or write a value to 
the Asterisk database.  On a
+                       read, this function returns the corresponding value 
from the database, or blank
+                       if it does not exist.  Reading a database value will 
also set the variable
+                       DB_RESULT.  If you wish to find out if an entry exists, 
use the DB_EXISTS
+                       function.</para>
+               </description>
+               <see-also>
+                       <ref type="application">DBdel</ref>
+                       <ref type="function">DB_DELETE</ref>
+                       <ref type="application">DBdeltree</ref>
+                       <ref type="function">DB_EXISTS</ref>
+               </see-also>
+       </function>
+       <function name="DB_SHARED" language="en_US">
+               <synopsis>
+                       Create or delete a shared family in the Asterisk 
database.
+               </synopsis>
+               <syntax argsep="/">
+                       <parameter name="action" required="true">
+                       </parameter>
+                       <parameter name="type">
+                       </parameter>
                </syntax>
                <description>
                        <para>This function will read from or write a value to 
the Asterisk database.  On a
@@ -200,14 +225,14 @@
        buf[0] = '\0';
 
        if (ast_strlen_zero(parse)) {
-               ast_log(LOG_WARNING, "DB_EXISTS requires an argument, 
DB(<family>/<key>)\n");
+               ast_log(LOG_WARNING, "DB_EXISTS requires an argument, 
DB_EXISTS(<family>/<key>)\n");
                return -1;
        }
 
        AST_NONSTANDARD_APP_ARGS(args, parse, '/');
 
        if (args.argc < 2) {
-               ast_log(LOG_WARNING, "DB_EXISTS requires an argument, 
DB(<family>/<key>)\n");
+               ast_log(LOG_WARNING, "DB_EXISTS requires an argument, 
DB_EXISTS(<family>/<key>)\n");
                return -1;
        }
 
@@ -335,6 +360,106 @@
        .write = function_db_delete_write,
 };
 
+static int function_db_shared_exists_read(struct ast_channel *chan,
+       const char *cmd, char *parse, char *buf, size_t len)
+{
+       AST_DECLARE_APP_ARGS(args,
+               AST_APP_ARG(family);
+       );
+
+       buf[0] = '\0';
+
+       if (ast_strlen_zero(parse)) {
+               ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, 
DB_SHARED_EXISTS(<family>)\n");
+               return -1;
+       }
+
+       AST_STANDARD_APP_ARGS(args, parse);
+
+       if (args.argc != 1) {
+               ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, 
DB_SHARED_EXISTS(<family>)\n");
+               return -1;
+       }
+
+       if (ast_db_is_shared(args.family)) {
+               ast_copy_string(buf, "1", len);
+       } else {
+               ast_copy_string(buf, "0", len);
+       }
+       pbx_builtin_setvar_helper(chan, "DB_RESULT", buf);
+
+       return 0;
+}
+
+static struct ast_custom_function db_shared_exists_function = {
+       .name = "DB_SHARED_EXISTS",
+       .read = function_db_shared_exists_read,
+};
+
+static int function_db_shared_write(struct ast_channel *chan, const char *cmd, 
char *parse,
+       const char *value)
+{
+       enum ast_db_shared_type share_type;
+
+       AST_DECLARE_APP_ARGS(args,
+               AST_APP_ARG(action);
+               AST_APP_ARG(type);
+       );
+
+       if (ast_strlen_zero(parse)) {
+               ast_log(LOG_WARNING, "DB_SHARED requires an argument, 
DB_SHARED(<action>[,<type>])=<family>\n");
+               return -1;
+       }
+
+       AST_STANDARD_APP_ARGS(args, parse);
+
+       if (args.argc < 1) {
+               ast_log(LOG_WARNING, "DB_SHARED requires an argument, 
DB_SHARED(<action>[,<type>])=<family>\n");
+               return -1;
+       }
+
+       if (ast_strlen_zero(value)) {
+               ast_log(LOG_WARNING, "DB_SHARED requires a value, 
DB_SHARED(<action>[,<type>])=<family>\n");
+               return -1;
+       }
+
+       if (!strcasecmp(args.action, "put")) {
+               if (ast_strlen_zero(args.type) || !strcasecmp(args.type, 
"global")) {
+                       share_type = SHARED_DB_TYPE_GLOBAL;
+               } else if (!strcasecmp(args.type, "unique")) {
+                       share_type = SHARED_DB_TYPE_UNIQUE;
+               } else {
+                       ast_log(LOG_WARNING, "DB_SHARED: Unknown 'type' %s\n", 
args.type);
+                       return -1;
+               }
+
+               if (ast_db_put_shared(value, share_type)) {
+                       /* Generally, failure is benign (key exists) */
+                       ast_debug(2, "Failed to create shared family '%s'\n", 
value);
+               } else {
+                       ast_verb(4, "Created %s shared family '%s'",
+                               share_type == SHARED_DB_TYPE_GLOBAL ? "GLOBAL" 
: "UNIQUE",
+                               value);
+               }
+       } else if (!strcasecmp(args.action, "delete")) {
+               if (ast_db_del_shared(value)) {
+                       /* Generally, failure is benign (key doesn't exist) */
+                       ast_debug(2, "Failed to delete shared family '%s'\n", 
value);
+               } else {
+                       ast_verb(4, "Deleted shared family '%s'\n", value);
+               }
+       } else {
+               ast_log(LOG_WARNING, "DB_SHARED: Unknown 'action' %s\n", 
args.action);
+       }
+
+       return 0;
+}
+
+static struct ast_custom_function db_shared_function = {
+       .name = "DB_SHARED",
+       .write = function_db_shared_write,
+};
+
 static int unload_module(void)
 {
        int res = 0;
@@ -343,6 +468,8 @@
        res |= ast_custom_function_unregister(&db_exists_function);
        res |= ast_custom_function_unregister(&db_delete_function);
        res |= ast_custom_function_unregister(&db_keys_function);
+       res |= ast_custom_function_unregister(&db_shared_function);
+       res |= ast_custom_function_unregister(&db_shared_exists_function);
 
        return res;
 }
@@ -355,6 +482,8 @@
        res |= ast_custom_function_register(&db_exists_function);
        res |= ast_custom_function_register_escalating(&db_delete_function, 
AST_CFE_READ);
        res |= ast_custom_function_register(&db_keys_function);
+       res |= ast_custom_function_register_escalating(&db_shared_function, 
AST_CFE_WRITE);
+       res |= ast_custom_function_register(&db_shared_exists_function);
 
        return res;
 }

Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h Mon Mar  9 
09:13:38 2015
@@ -28,11 +28,73 @@
 extern "C" {
 #endif
 
+#include "asterisk/utils.h"
+
+enum ast_db_shared_type {
+       /* Items in the shared family are common across all Asterisk instances 
*/
+       SHARED_DB_TYPE_GLOBAL = 0,
+       /*! Items in the shared family are made unique across all Asterisk 
instances */
+       SHARED_DB_TYPE_UNIQUE,
+};
+
 struct ast_db_entry {
        struct ast_db_entry *next;
        char *key;
        char data[0];
 };
+
+struct stasis_topic;
+struct stasis_message_type;
+
+struct ast_db_shared_family {
+       /*! How the family is shared */
+       enum ast_db_shared_type share_type;
+       /*! Entries in the family, if appropriate */
+       struct ast_db_entry *entries;
+       /*! The name of the shared family */
+       char name[0];
+};
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value);
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, 
enum ast_db_shared_type share_type);
+
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct 
ast_db_shared_family *shared_family, struct ast_eid *eid);
+
+void ast_db_refresh_shared(void);
+
+/*! \addtogroup StasisTopicsAndMessages
+ * @{
+ */
+
+struct stasis_topic *ast_db_cluster_topic(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message sent from this Asterisk instance
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_put_shared_type(void);
+
+/*!
+ * \since 14
+ * \brief Message type for an RTCP message received from some external source
+ *
+ * \retval A stasis message type
+ */
+struct stasis_message_type *ast_db_del_shared_type(void);
+
+/* }@ */
+
+/*!
+ * \brief @@@@
+ */
+int ast_db_put_shared(const char *family, enum ast_db_shared_type);
+
+int ast_db_del_shared(const char *family);
+
+int ast_db_is_shared(const char *family);
 
 /*! \brief Get key value specified by family/key */
 int ast_db_get(const char *family, const char *key, char *value, int valuelen);

Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h (original)
+++ team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h Mon Mar  9 
09:13:38 2015
@@ -922,7 +922,7 @@
  * \brief Convert an EID to a string
  * \since 1.6.1
  */
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid);
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid);
 
 /*!
  * \brief Convert a string into an EID

Modified: team/mjordan/trunk-astdb-cluster/main/db.c
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/db.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/db.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/db.c Mon Mar  9 09:13:38 2015
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2015, Digium, Inc.
  *
  * Mark Spencer <marks...@digium.com>
  *
@@ -22,9 +22,6 @@
  *
  * \author Mark Spencer <marks...@digium.com>
  *
- * \note DB3 is licensed under Sleepycat Public License and is thus 
incompatible
- * with GPL.  To avoid having to make another exception (and complicate
- * licensing even further) we elect to use DB1 which is BSD licensed
  */
 
 /*** MODULEINFO
@@ -45,11 +42,12 @@
 #include <dirent.h>
 #include <sqlite3.h>
 
-#include "asterisk/channel.h"
 #include "asterisk/file.h"
+#include "asterisk/utils.h"
+#include "asterisk/astdb.h"
+#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
 #include "asterisk/app.h"
-#include "asterisk/dsp.h"
-#include "asterisk/astdb.h"
 #include "asterisk/cli.h"
 #include "asterisk/utils.h"
 #include "asterisk/manager.h"
@@ -114,7 +112,16 @@
 static int doexit;
 static int dosync;
 
+/*! \brief A container of families to share across Asterisk instances */
+static struct ao2_container *shared_families;
+
+static struct stasis_topic *db_cluster_topic;
+
+static struct stasis_message_router *message_router;
+
 static void db_sync(void);
+
+#define SHARED_FAMILY "__asterisk_shared_family"
 
 #define DEFINE_SQL_STATEMENT(stmt,sql) static sqlite3_stmt *stmt; \
        const char stmt##_sql[] = sql;
@@ -199,6 +206,76 @@
 
        return res;
 }
+
+struct ast_db_entry *ast_db_entry_create(const char *key, const char *value)
+{
+       struct ast_db_entry *entry;
+
+       entry = ast_malloc(sizeof(*entry) + strlen(key) + strlen(value) + 2);
+       if (!entry) {
+               return NULL;
+       }
+       entry->next = NULL;
+       entry->key = entry->data + strlen(value) + 1;
+       strcpy(entry->data, value); /* safe */
+       strcpy(entry->key, key); /* safe */
+
+       return entry;
+}
+
+static void shared_db_family_dtor(void *obj)
+{
+       struct ast_db_shared_family *family = obj;
+
+       ast_db_freetree(family->entries);
+}
+
+struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, 
enum ast_db_shared_type share_type)
+{
+       struct ast_db_shared_family *shared_family;
+
+       shared_family = ao2_alloc_options(sizeof(*shared_family) + 
strlen(family) + 1,
+               shared_db_family_dtor, OBJ_NOLOCK);
+       if (!shared_family) {
+               return NULL;
+       }
+       strcpy(shared_family->name, family); /* safe */
+       shared_family->share_type = share_type;
+
+       return shared_family;
+}
+
+static struct ast_db_shared_family *db_shared_family_clone(const struct 
ast_db_shared_family *shared_family)
+{
+       struct ast_db_shared_family *clone;
+
+       clone = ast_db_shared_family_alloc(shared_family->name, 
shared_family->share_type);
+
+       return clone;
+}
+
+static int db_shared_family_sort_fn(const void *obj_left, const void 
*obj_right, int flags)
+{
+       const struct ast_db_shared_family *left = obj_left;
+       const struct ast_db_shared_family *right = obj_right;
+       const char *right_key = obj_right;
+       int cmp;
+
+       switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
+       default:
+       case OBJ_POINTER:
+               right_key = right->name;
+               /* Fall through */
+       case OBJ_KEY:
+               cmp = strcmp(left->name, right_key);
+               break;
+       case OBJ_PARTIAL_KEY:
+               cmp = strncmp(left->name, right_key, strlen(right_key));
+               break;
+       }
+       return cmp;
+}
+
 
 static int db_create_astdb(void)
 {
@@ -308,7 +385,152 @@
        return db_execute_sql("ROLLBACK", NULL, NULL);
 }
 
-int ast_db_put(const char *family, const char *key, const char *value)
+static int db_put_common(const char *family, const char *key, const char 
*value, int share);
+
+int ast_db_put_shared(const char *family, enum ast_db_shared_type share_type)
+{
+       struct ast_db_shared_family *shared_family;
+
+       if (ast_strlen_zero(family)) {
+               return -1;
+       }
+
+       ao2_lock(shared_families);
+
+       shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | 
OBJ_NOLOCK);
+       if (shared_family) {
+               ao2_ref(shared_family, -1);
+               ao2_unlock(shared_families);
+               return -1;
+       }
+
+       shared_family = ast_db_shared_family_alloc(family, share_type);
+       if (!shared_family) {
+               ao2_unlock(shared_families);
+               return -1;
+       }
+
+       ao2_link_flags(shared_families, shared_family, OBJ_NOLOCK);
+
+       db_put_common(SHARED_FAMILY, shared_family->name,
+               share_type == SHARED_DB_TYPE_UNIQUE ? "UNIQUE" : "GLOBAL", 0);
+
+       ao2_ref(shared_family, -1);
+
+       ao2_unlock(shared_families);
+
+       return 0;
+}
+
+int ast_db_is_shared(const char *family)
+{
+       struct ast_db_shared_family *shared_family;
+       int res = 0;
+
+       shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY);
+       if (shared_family) {
+               res = 1;
+               ao2_ref(shared_family, -1);
+       }
+
+       return res;
+}
+
+static int db_put_shared(const char *family, const char *key, const char 
*value)
+{
+       struct ast_db_shared_family *shared_family;
+       struct ast_db_shared_family *clone;
+
+       /* See if we are shared */
+       shared_family = ao2_find(shared_families, family, 
OBJ_SEARCH_PARTIAL_KEY);
+       if (!shared_family) {
+               return 0;
+       }
+
+       /* Create a Stasis message for the new item */
+       clone = db_shared_family_clone(shared_family);
+       if (!clone) {
+               ao2_ref(shared_family, -1);
+               return -1;
+       }
+       clone->entries = ast_db_entry_create(key, value);
+       if (!clone->entries) {
+               ao2_ref(shared_family, -1);
+               ao2_ref(clone, -1);
+               return -1;
+       }
+
+       /* Publish */
+       ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL);
+
+       ao2_ref(shared_family, -1);
+
+       return 0;
+}
+
+static int db_del_shared(const char *family, const char *key)
+{
+       struct ast_db_shared_family *shared_family;
+       struct ast_db_shared_family *clone;
+
+       /* See if we are shared */
+       shared_family = ao2_find(shared_families, family, 
OBJ_SEARCH_PARTIAL_KEY);
+       if (!shared_family) {
+               return 0;
+       }
+
+       if (ast_strlen_zero(key)) {
+               clone = ao2_bump(shared_family);
+       } else {
+               clone = db_shared_family_clone(shared_family);
+               if (!clone) {
+                       ao2_ref(shared_family, -1);
+                       return -1;
+               }
+               clone->entries = ast_db_entry_create(key, "");
+               if (!clone->entries) {
+                       ao2_ref(shared_family, -1);
+                       ao2_ref(clone, -1);
+                       return -1;
+               }
+       }
+
+       /* Publish */
+       ast_db_publish_shared_message(ast_db_del_shared_type(), clone, NULL);
+
+       ao2_ref(shared_family, -1);
+
+       return 0;
+}
+
+static int db_del_common(const char *family, const char *key, int share);
+
+int ast_db_del_shared(const char *family)
+{
+       struct ast_db_shared_family *shared_family;
+       int res = 0;
+
+       if (ast_strlen_zero(family)) {
+               return -1;
+       }
+
+       ao2_lock(shared_families);
+
+       shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | 
OBJ_NOLOCK);
+       if (shared_family) {
+               ao2_unlink_flags(shared_families, shared_family, OBJ_NOLOCK);
+               db_del_common(SHARED_FAMILY, shared_family->name, 0);
+               ao2_ref(shared_family, -1);
+       } else {
+               res = -1;
+       }
+
+       ao2_unlock(shared_families);
+
+       return res;
+}
+
+static int db_put_common(const char *family, const char *key, const char 
*value, int share)
 {
        char fullkey[MAX_DB_FIELD];
        size_t fullkey_len;
@@ -335,9 +557,17 @@
 
        sqlite3_reset(put_stmt);
        db_sync();
+       if (share) {
+               db_put_shared(family, key, value);
+       }
        ast_mutex_unlock(&dblock);
 
        return res;
+}
+
+int ast_db_put(const char *family, const char *key, const char *value)
+{
+       return db_put_common(family, key, value, 1);
 }
 
 /*!
@@ -410,7 +640,7 @@
        return db_get_common(family, key, out, -1);
 }
 
-int ast_db_del(const char *family, const char *key)
+static int db_del_common(const char *family, const char *key, int share)
 {
        char fullkey[MAX_DB_FIELD];
        size_t fullkey_len;
@@ -433,12 +663,20 @@
        }
        sqlite3_reset(del_stmt);
        db_sync();
+       if (share) {
+               db_del_shared(family, key);
+       }
        ast_mutex_unlock(&dblock);
 
-       return res;
-}
-
-int ast_db_deltree(const char *family, const char *keytree)
+       return res;     
+}
+
+int ast_db_del(const char *family, const char *key)
+{
+       return db_del_common(family, key, 1);
+}
+
+static int db_deltree_common(const char *family, const char *keytree, int 
share)
 {
        sqlite3_stmt *stmt = deltree_stmt;
        char prefix[MAX_DB_FIELD];
@@ -468,9 +706,17 @@
        res = sqlite3_changes(astdb);
        sqlite3_reset(stmt);
        db_sync();
+       if (share) {
+               db_del_shared(prefix, NULL);
+       }
        ast_mutex_unlock(&dblock);
 
-       return res;
+       return res;     
+}
+
+int ast_db_deltree(const char *family, const char *keytree)
+{
+       return db_deltree_common(family, keytree, 1);
 }
 
 struct ast_db_entry *ast_db_gettree(const char *family, const char *keytree)
@@ -508,13 +754,10 @@
                if (!(value_s = (const char *) sqlite3_column_text(stmt, 1))) {
                        break;
                }
-               if (!(cur = ast_malloc(sizeof(*cur) + strlen(key_s) + 
strlen(value_s) + 2))) {
+               cur = ast_db_entry_create(key_s, value_s);
+               if (!cur) {
                        break;
                }
-               cur->next = NULL;
-               cur->key = cur->data + strlen(value_s) + 1;
-               strcpy(cur->data, value_s);
-               strcpy(cur->key, key_s);
                if (last) {
                        last->next = cur;
                } else {
@@ -748,14 +991,26 @@
 
        while (sqlite3_step(showkey_stmt) == SQLITE_ROW) {
                const char *key_s, *value_s;
+               char *family_s;
+               char *delim;
+
                if (!(key_s = (const char *) sqlite3_column_text(showkey_stmt, 
0))) {
                        break;
                }
                if (!(value_s = (const char *) 
sqlite3_column_text(showkey_stmt, 1))) {
                        break;
                }
+               family_s = ast_strdup(key_s);
+               if (!family_s) {
+                       break;
+               }
+               delim = strchr(family_s + 1, '/');
+               *delim = '\0';
+
                ++counter;
-               ast_cli(a->fd, "%-50s: %-25s\n", key_s, value_s);
+               ast_cli(a->fd, "%-50s: %-25s %s\n", key_s, value_s,
+                       ast_db_is_shared(family_s + 1) ? "(S)" : "");
+               ast_free(family_s);
        }
        sqlite3_reset(showkey_stmt);
        ast_mutex_unlock(&dblock);
@@ -984,6 +1239,197 @@
        return NULL;
 }
 
+int ast_db_publish_shared_message(struct stasis_message_type *type, struct 
ast_db_shared_family *shared_family, struct ast_eid *eid)
+{
+       struct stasis_message *message;
+
+       /* Aggregate doesn't really apply to the AstDB; as such, if we aren't
+        * provided an EID use our own.
+        */
+       if (!eid) {
+               eid = &ast_eid_default;
+       }
+
+       message = stasis_message_create_full(type, shared_family, eid);
+       if (!message) {
+               return -1;
+       }
+
+       stasis_publish(ast_db_cluster_topic(), message);
+
+       return 0;
+}
+
+void ast_db_refresh_shared(void)
+{
+       struct ao2_iterator it_shared_families;
+       struct ast_db_shared_family *shared_family;
+
+       it_shared_families = ao2_iterator_init(shared_families, 0);
+       while ((shared_family = ao2_iterator_next(&it_shared_families))) {
+               struct ast_db_shared_family *clone;
+
+               clone = db_shared_family_clone(shared_family);
+               if (!clone) {
+                       ao2_ref(shared_family, -1);
+                       continue;
+               }
+
+               clone->entries = ast_db_gettree(shared_family->name, "");
+               if (!clone->entries) {
+                       ao2_ref(clone, -1);
+                       ao2_ref(shared_family, -1);
+                       continue;
+               }
+
+               ast_db_publish_shared_message(ast_db_put_shared_type(), clone, 
NULL);
+
+               ao2_ref(clone, -1);
+               ao2_ref(shared_family, -1);
+       }
+       ao2_iterator_destroy(&it_shared_families);      
+}
+
+static struct ast_event *db_del_shared_type_to_event(struct stasis_message 
*message)
+{
+       return NULL;
+}
+
+static struct ast_json *db_entries_to_json(struct ast_db_entry *entry)
+{
+       struct ast_json *json;
+       struct ast_db_entry *cur;
+
+       json = ast_json_array_create();
+       if (!json) {
+               return NULL;
+       }
+
+       for (cur = entry; cur; cur = cur->next) {
+               struct ast_json *json_entry;
+
+               json_entry = ast_json_pack("{s: s, s: s}",
+                       "key", cur->key,
+                       "data", cur->data);
+               if (!json_entry) {
+                       ast_json_unref(json);
+                       return NULL;
+               }
+
+               if (ast_json_array_append(json, json_entry)) {
+                       ast_json_unref(json);
+                       return NULL;
+               }
+       }
+
+       return json;
+}
+
+static struct ast_json *db_shared_family_to_json(struct stasis_message 
*message,
+       const struct stasis_message_sanitizer *sanitize)
+{
+       struct stasis_message_type *type = stasis_message_type(message);
+       struct ast_db_shared_family *shared_family;
+
+       shared_family = stasis_message_data(message);
+       if (!shared_family) {
+               return NULL;
+       }
+
+       return ast_json_pack("{s: s, s: s, s: s, s: o}",
+               "verb", type == ast_db_put_shared_type() ? "put" : "delete",
+               "family", shared_family->name,
+               "share_type", shared_family->share_type == 
SHARED_DB_TYPE_UNIQUE ? "unique" : "global",
+               "entries", shared_family->entries ? 
db_entries_to_json(shared_family->entries) : ast_json_null());
+}
+
+static struct ast_event *db_put_shared_type_to_event(struct stasis_message 
*message)
+{
+       return NULL;
+}
+
+struct stasis_topic *ast_db_cluster_topic(void)
+{
+       return db_cluster_topic;
+}
+
+STASIS_MESSAGE_TYPE_DEFN(ast_db_put_shared_type,
+               .to_event = db_put_shared_type_to_event,
+               .to_json = db_shared_family_to_json,
+       );
+STASIS_MESSAGE_TYPE_DEFN(ast_db_del_shared_type,
+               .to_event = db_del_shared_type_to_event,
+               .to_json = db_shared_family_to_json,
+       );
+
+static void db_put_shared_msg_cb(void *data, struct stasis_subscription *sub, 
struct stasis_message *message)
+{
+       struct ast_db_shared_family *shared_family;
+       struct ast_db_shared_family *shared_check;
+       struct ast_db_entry *cur;
+       const struct ast_eid *eid;
+       char *family_id;
+
+       shared_family = stasis_message_data(message);
+       if (!shared_family) {
+               return;
+       }
+
+       eid = stasis_message_eid(message);
+       if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+               return;
+       }
+
+       /* Don't update if we don't have this area shared on this server */
+       shared_check = ao2_find(shared_families, shared_family->name, OBJ_KEY);
+       if (!shared_check) {
+               return;
+       }
+       ao2_ref(shared_check, -1);
+
+       if (shared_family->share_type == SHARED_DB_TYPE_UNIQUE) {
+               char eid_workspace[20];
+
+               /* Length is family + '/' + EID length (20) + 1 */
+               family_id = ast_alloca(strlen(shared_family->name) + 22);
+               ast_eid_to_str(eid_workspace, sizeof(eid_workspace), eid);
+               sprintf(family_id, "%s/%s", eid_workspace, 
shared_family->name); /* safe */
+       } else {
+               family_id = shared_family->name;
+       }
+
+       for (cur = shared_family->entries; cur; cur = cur->next) {
+               db_put_common(family_id, cur->key, cur->data, 0);
+       }
+}
+
+static void db_del_shared_msg_cb(void *data, struct stasis_subscription *sub, 
struct stasis_message *message)
+{
+       struct ast_db_shared_family *shared_family;
+       struct ast_db_entry *cur;
+       const struct ast_eid *eid;
+
+       shared_family = stasis_message_data(message);
+       if (!shared_family) {
+               return;
+       }
+
+       eid = stasis_message_eid(message);
+       if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) {
+               return;
+       }
+
+       cur = shared_family->entries;
+       if (!cur) {
+               db_deltree_common(shared_family->name, NULL, 0);
+               return;
+       }
+
+       for (; cur; cur = cur->next) {
+               db_del_common(shared_family->name, cur->key, 0);
+       }
+}
+
 /*!
  * \internal
  * \brief Clean up resources on Asterisk shutdown
@@ -995,6 +1441,11 @@
        ast_manager_unregister("DBPut");
        ast_manager_unregister("DBDel");
        ast_manager_unregister("DBDelTree");
+
+       ao2_cleanup(db_cluster_topic);
+       db_cluster_topic = NULL;
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_db_put_shared_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_db_del_shared_type);
 
        /* Set doexit to 1 to kill thread. db_sync must be called with
         * mutex held. */
@@ -1005,6 +1456,10 @@
 
        pthread_join(syncthread, NULL);
        ast_mutex_lock(&dblock);
+
+       ao2_ref(shared_families, -1);
+       shared_families = NULL;
+
        clean_statements();
        if (sqlite3_close(astdb) == SQLITE_OK) {
                astdb = NULL;
@@ -1014,12 +1469,43 @@
 
 int astdb_init(void)
 {
+       shared_families = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
+               AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT | 
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
+               db_shared_family_sort_fn, NULL);
+       if (!shared_families) {
+               return -1;
+       }
+
+       db_cluster_topic = stasis_topic_create("ast_db_cluster_topic");
+       if (!db_cluster_topic) {
+               ao2_ref(shared_families, -1);
+               return -1;
+       }
+
+       STASIS_MESSAGE_TYPE_INIT(ast_db_put_shared_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_db_del_shared_type);
+
+       message_router = 
stasis_message_router_create_pool(ast_db_cluster_topic());
+       if (!message_router) {
+               ao2_ref(db_cluster_topic, -1);
+               ao2_ref(shared_families, -1);
+               return -1;
+       }
+       stasis_message_router_add(message_router, ast_db_put_shared_type(),
+               db_put_shared_msg_cb, NULL);
+       stasis_message_router_add(message_router, ast_db_del_shared_type(),
+               db_del_shared_msg_cb, NULL);
+
        if (db_init()) {
+               ao2_ref(db_cluster_topic, -1);
+               ao2_ref(shared_families, -1);
                return -1;
        }
 
        ast_cond_init(&dbcond, NULL);
        if (ast_pthread_create_background(&syncthread, NULL, db_sync_thread, 
NULL)) {
+               ao2_ref(db_cluster_topic, -1);
+               ao2_ref(shared_families, -1);
                return -1;
        }
 

Modified: team/mjordan/trunk-astdb-cluster/main/utils.c
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/utils.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/main/utils.c (original)
+++ team/mjordan/trunk-astdb-cluster/main/utils.c Mon Mar  9 09:13:38 2015
@@ -2691,7 +2691,7 @@
 }
 #endif /* defined(AST_DEVMODE) */
 
-char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid)
+char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid)
 {
        int x;
        char *os = s;

Modified: team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c
URL: 
http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c?view=diff&rev=432635&r1=432634&r2=432635
==============================================================================
--- team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c (original)
+++ team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c Mon Mar  
9 09:13:38 2015
@@ -36,6 +36,7 @@
 #include "asterisk/module.h"
 #include "asterisk/logger.h"
 #include "asterisk/app.h"
+#include "asterisk/astdb.h"
 
 /*** DOCUMENTATION
        <configInfo name="res_pjsip_publish_asterisk" language="en_US">
@@ -58,6 +59,9 @@
                                <configOption name="mailboxstate_publish">
                                        <synopsis>Optional name of a publish 
item that can be used to publish a request for full mailbox state 
information.</synopsis>
                                </configOption>
+                               <configOption name="dbstate_publish">
+                                       <synopsis>Optional name of a publish 
item that can be used to publish a request for full AstDB state 
information.</synopsis>
+                               </configOption>
                                <configOption name="device_state" default="no">
                                        <synopsis>Whether we should permit 
incoming device state events.</synopsis>
                                </configOption>
@@ -70,6 +74,12 @@
                                <configOption name="mailbox_state_filter">
                                        <synopsis>Optional regular expression 
used to filter what mailboxes we accept events for.</synopsis>
                                </configOption>
+                               <configOption name="db_state" default="no">
+                                       <synopsis>Whether we should permit 
incoming AstDB state events.</synopsis>
+                               </configOption>
+                               <configOption name="db_state_filter">
+                                       <synopsis>Optional regular expression 
used to filter what AstDB families we accept events for.</synopsis>
+                               </configOption>
                                <configOption name="type">
                                        <synopsis>Must be of type 
'asterisk-publication'.</synopsis>
                                </configOption>
@@ -102,6 +112,18 @@
        unsigned int mailbox_state_filter;
 };
 
+/*! \brief Structure which contains Asterisk AstDB publisher state information 
*/
+struct asterisk_db_publisher_state {
+       /*! \brief The publish client to send PUBLISH messages on */
+       struct ast_sip_outbound_publish_client *client;
+       /*! \brief AstDB subscription */
+       struct stasis_subscription *db_state_subscription;
+       /*! \brief Regex used for filtering outbound db families */
+       regex_t db_state_regex;
+       /*! \brief AstDB families should be filtered */
+       unsigned int db_state_filter;
+};
+
 /*! \brief Structure which contains Asterisk publication information */
 struct asterisk_publication_config {
        /*! \brief Sorcery object details */
@@ -112,6 +134,8 @@
                AST_STRING_FIELD(devicestate_publish);
                /*! \brief Optional name of a mailbox state publish item, used 
to request the remote side update us */
                AST_STRING_FIELD(mailboxstate_publish);
+               /*! \brief Optional name of an AstDB publish item, used to 
request the remote side update us */
+               AST_STRING_FIELD(dbstate_publish);
        );
        /*! \brief Accept inbound device state events */
        unsigned int device_state;
@@ -125,6 +149,12 @@
        regex_t mailbox_state_regex;
        /*! \brief Mailbox state should be filtered */
        unsigned int mailbox_state_filter;
+       /*! \brief Accept inbound AstDB state events */
+       unsigned int db_state;
+       /*! \brief Regex used for filtering inbound AstDB state */
+       regex_t db_state_regex;
+       /*! \brief AstDB state should be filtered */
+       unsigned int db_state_filter;
 };
 
 /*! \brief Destroy callback for Asterisk devicestate publisher state 
information from datastore */
@@ -281,6 +311,78 @@
        ast_json_unref(json);
 }
 
+/*!
+ * \brief Callback function for db state events
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void asterisk_publisher_dbstate_cb(void *data, struct 
stasis_subscription *sub, struct stasis_message *msg)
+{
+       struct ast_datastore *datastore = data;
+       struct asterisk_db_publisher_state *publisher_state = datastore->data;
+       struct ast_json *json_db;
+       struct ast_json *json;
+       const struct ast_eid *eid;
+       char eid_str[20];
+       struct ast_db_shared_family *shared_family;
+       char *text;
+       struct ast_sip_body body = {
+               .type = "application",
+               .subtype = "json",
+       };
+
+       if (!stasis_subscription_is_subscribed(sub)) {
+               return;
+       }
+
+       eid = stasis_message_eid(msg);
+       if (!eid || ast_eid_cmp(&ast_eid_default, eid)) {
+               /* If the event is aggregate, unknown, or didn't originate from 
this
+                * server, don't send it out. */
+               return;         
+       }
+
+       shared_family = stasis_message_data(msg);
+       if (!shared_family) {
+               return;
+       }
+
+       if (publisher_state->db_state_filter && 
regexec(&publisher_state->db_state_regex, shared_family->name, 0, NULL, 0)) {
+               /* Outgoing AstDB state is filtered and the family wasn't 
allowed */
+               return;
+       }
+
+       json_db = stasis_message_to_json(msg, NULL);
+       if (!json_db) {
+               return;
+       }
+
+
+       ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+       json = ast_json_pack(
+               "{ s: s, s: s, s: o }",
+               "type", "dbstate",
+               "eid", eid_str,
+               "dbstate", json_db);
+       if (!json) {
+               ast_json_unref(json_db);
+               return;
+       }
+
+       text = ast_json_dump_string(json);
+       if (!text) {
+               ast_json_unref(json);
+               return;
+       }
+       body.body_text = text;
+
+       ast_sip_publish_client_send(publisher_state->client, &body);
+
+       ast_json_free(text);
+       ast_json_unref(json);
+}
+
 static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
@@ -469,6 +571,76 @@
        .stop_publishing = asterisk_stop_mwi_publishing,
 };
 
+static int asterisk_start_db_publishing(struct ast_sip_outbound_publish 
*configuration,
+       struct ast_sip_outbound_publish_client *client)
+{
+       RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
+       struct asterisk_db_publisher_state *publisher_state;
+       const char *value;
+
+       datastore = 
ast_sip_publish_client_alloc_datastore(&asterisk_mwi_publisher_state_datastore, 
"asterisk-db-publisher");
+       if (!datastore) {
+               return -1;
+       }
+
+       publisher_state = ast_calloc(1, sizeof(*publisher_state));
+       if (!publisher_state) {
+               return -1;
+       }
+       datastore->data = publisher_state;
+
+       value = ast_sorcery_object_get_extended(configuration, 
"db_state_filter");
+       if (!ast_strlen_zero(value)) {
+               if (build_regex(&publisher_state->db_state_regex, value)) {
+                       return -1;
+               }
+               publisher_state->db_state_filter = 1;
+       }
+
+       publisher_state->client = ao2_bump(client);
+
+       if (ast_sip_publish_client_add_datastore(client, datastore)) {
+               return -1;
+       }
+
+       publisher_state->db_state_subscription = 
stasis_subscribe(ast_db_cluster_topic(),
+               asterisk_publisher_dbstate_cb, ao2_bump(datastore));
+       if (!publisher_state->db_state_subscription) {
+               ast_sip_publish_client_remove_datastore(client, 
"asterisk-db-publisher");
+               ao2_ref(datastore, -1);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int asterisk_stop_db_publishing(struct ast_sip_outbound_publish_client 
*client)
+{
+       RAII_VAR(struct ast_datastore *, datastore, 
ast_sip_publish_client_get_datastore(client, "asterisk-db-publisher"),
+               ao2_cleanup);
+       struct asterisk_db_publisher_state *publisher_state;
+
+       if (!datastore) {
+               return 0;
+       }
+
+       publisher_state = datastore->data;
+       if (publisher_state->db_state_subscription) {
+               
stasis_unsubscribe_and_join(publisher_state->db_state_subscription);
+               ao2_ref(datastore, -1);
+       }
+
+       ast_sip_publish_client_remove_datastore(client, 
"asterisk-db-publisher");
+
+       return 0;
+}
+
+struct ast_sip_event_publisher_handler asterisk_db_publisher_handler = {
+       .event_name = "asterisk-db",
+       .start_publishing = asterisk_start_db_publishing,
+       .stop_publishing = asterisk_stop_db_publishing,
+};
+
 static int asterisk_publication_new(struct ast_sip_endpoint *endpoint, const 
char *resource, const char *event_configuration)
 {
        RAII_VAR(struct asterisk_publication_config *, config, 
ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
@@ -545,6 +717,114 @@
        mailbox = strsep(&item_id, "@");
 
        ast_publish_mwi_state_full(mailbox, item_id, new_msgs, old_msgs, NULL, 
pubsub_eid);
+
+       return 0;
+}
+
+static int asterisk_publication_dbstate(struct ast_sip_publication *pub, 
struct asterisk_publication_config *config,
+       struct ast_eid *pubsub_eid, struct ast_json *json)
+{
+       struct ast_json *json_db = ast_json_object_get(json, "dbstate");
+       struct ast_json *json_entries;
+       struct stasis_message_type *type;
+       struct ast_db_shared_family *shared_family;
+       struct ast_db_entry *entry = NULL;
+       struct ast_db_entry *cur = NULL;
+       enum ast_db_shared_type share_type;
+       const char *family;
+       const char *verb;
+       const char *str_share_type;
+       int i;
+
+       if (!json_db) {
+               ast_debug(2, "Received AstDB state event with no 'dbstate' 
body\n");
+               return 0;
+       }
+
+       if (!config->db_state) {
+               ast_debug(2, "Received AstDB state event for resource '%s' but 
it is not configured to accept them\n",
+                       ast_sorcery_object_get_id(config));
+               return 0;
+       }
+
+       family = ast_json_string_get(ast_json_object_get(json_db, "family"));
+       if (ast_strlen_zero(family)) {
+               ast_debug(1, "Received incomplete AstDB state event for 
resource '%s': missing 'family'\n",
+                       ast_sorcery_object_get_id(config));
+               return -1;
+       }
+
+       verb = ast_json_string_get(ast_json_object_get(json_db, "verb"));
+       if (ast_strlen_zero(verb)) {
+               ast_debug(1, "Received incomplete AstDB state event for 
resource '%s': missing 'verb'\n",
+                       ast_sorcery_object_get_id(config));
+               return -1;
+       } else if (!strcasecmp(verb, "put")) {
+               type = ast_db_put_shared_type();
+       } else if (!strcasecmp(verb, "delete")) {
+               type = ast_db_del_shared_type();
+       } else {
+               ast_debug(1, "Received bad AstDB state event for resource '%s': 
unknown verb '%s'\n",
+                       ast_sorcery_object_get_id(config), verb);
+               return -1;
+       }
+
+       str_share_type = ast_json_string_get(ast_json_object_get(json_db, 
"share_type"));
+       if (ast_strlen_zero(str_share_type)) {
+               ast_debug(1, "Received incomplete AstDB state event for 
resource '%s': missing 'share_type'\n",
+                       ast_sorcery_object_get_id(config));
+               return -1;
+       } else if (!strcasecmp(str_share_type, "global")) {
+               share_type = SHARED_DB_TYPE_GLOBAL;
+       } else if (!strcasecmp(str_share_type, "unique")) {
+               share_type = SHARED_DB_TYPE_UNIQUE;
+       } else {
+               ast_debug(1, "Received bad AstDB state event for resource '%s': 
unknown verb '%s'\n",
+                       ast_sorcery_object_get_id(config), str_share_type);
+               return -1;
+       }
+
+       json_entries = ast_json_object_get(json_db, "entries");
+       for (i = 0; i < ast_json_array_size(json_entries); i++) {
+               struct ast_db_entry *temp;
+               struct ast_json *json_entry;
+               const char *key;
+               const char *data;
+
+               json_entry = ast_json_array_get(json_entries, i);
+               if (!json_entry) {
+                       continue;
+               }
+               key = ast_json_string_get(ast_json_object_get(json_entry, 
"key"));
+               data = ast_json_string_get(ast_json_object_get(json_entry, 
"data"));
+
+               if (ast_strlen_zero(key) || !data) {
+                       continue;
+               }
+
+               temp = ast_db_entry_create(key, data);
+               if (!temp) {
+                       ast_db_freetree(entry);
+                       return -1;
+               }
+
+               if (cur) {
+                       cur->next = temp;
+                       cur = temp;
+               } else {
+                       entry = cur = temp;
+               }
+       }
+
+       shared_family = ast_db_shared_family_alloc(family, share_type);
+       if (!shared_family) {
+               ast_db_freetree(entry);
+               return -1;
+       }
+       shared_family->entries = entry;
+
+       ast_db_publish_shared_message(type, shared_family, pubsub_eid);
+       ao2_ref(shared_family, -1);
 
        return 0;
 }
@@ -733,6 +1013,75 @@
        return res;
 }
 
+static int asterisk_publication_db_refresh(struct ast_sip_publication *pub,
+       struct asterisk_publication_config *config, struct ast_eid *pubsub_eid, 
struct ast_json *json)
+{
+       if (ast_strlen_zero(config->dbstate_publish)) {
+               return 0;
+       }
+
+       ast_db_refresh_shared();
+
+       return 0;
+}
+
+static int asterisk_publication_db_state_change(struct ast_sip_publication 
*pub, pjsip_msg_body *body,
+                       enum ast_sip_publish_state state)
+{
+       RAII_VAR(struct asterisk_publication_config *, config, 
ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication",
+               ast_sip_publication_get_event_configuration(pub)), ao2_cleanup);
+       RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
+       const char *eid, *type;
+       struct ast_eid pubsub_eid;
+       int res = -1;
+
+       /* If no configuration exists for this publication it has most likely 
been removed, so drop this immediately */
+       if (!config) {
+               return -1;
+       }
+
+       /* If no body exists this is a refresh and can be ignored */
+       if (!body) {
+               return 0;
+       }
+

[... 780 lines stripped ...]

-- 
_____________________________________________________________________
-- Bandwidth and Colocation Provided by http://www.api-digital.com --

svn-commits mailing list
To UNSUBSCRIBE or update options visit:
   http://lists.digium.com/mailman/listinfo/svn-commits

Reply via email to