Author: mjordan Date: Mon Mar 9 09:13:38 2015 New Revision: 432635 URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=432635 Log: Initial commit
It mostly lives, breathes, and works. At least with PJSIP. Modified: team/mjordan/trunk-astdb-cluster/funcs/func_db.c team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h team/mjordan/trunk-astdb-cluster/main/db.c team/mjordan/trunk-astdb-cluster/main/utils.c team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c team/mjordan/trunk-astdb-cluster/tests/test_db.c Modified: team/mjordan/trunk-astdb-cluster/funcs/func_db.c URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/funcs/func_db.c?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/funcs/func_db.c (original) +++ team/mjordan/trunk-astdb-cluster/funcs/func_db.c Mon Mar 9 09:13:38 2015 @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2005-2006, Russell Bryant <russ...@clemson.edu> + * Copyright (C) 2005-2015, Russell Bryant <russ...@clemson.edu> * * func_db.c adapted from the old app_db.c, copyright by the following people * Copyright (C) 2005, Mark Spencer <marks...@digium.com> @@ -23,6 +23,7 @@ * \brief Functions for interaction with the Asterisk database * * \author Russell Bryant <russ...@clemson.edu> + * \author Matt Jordan <mjor...@digium.com> * * \ingroup functions */ @@ -52,6 +53,30 @@ <syntax argsep="/"> <parameter name="family" required="true" /> <parameter name="key" required="true" /> + </syntax> + <description> + <para>This function will read from or write a value to the Asterisk database. On a + read, this function returns the corresponding value from the database, or blank + if it does not exist. Reading a database value will also set the variable + DB_RESULT. If you wish to find out if an entry exists, use the DB_EXISTS + function.</para> + </description> + <see-also> + <ref type="application">DBdel</ref> + <ref type="function">DB_DELETE</ref> + <ref type="application">DBdeltree</ref> + <ref type="function">DB_EXISTS</ref> + </see-also> + </function> + <function name="DB_SHARED" language="en_US"> + <synopsis> + Create or delete a shared family in the Asterisk database. + </synopsis> + <syntax argsep="/"> + <parameter name="action" required="true"> + </parameter> + <parameter name="type"> + </parameter> </syntax> <description> <para>This function will read from or write a value to the Asterisk database. On a @@ -200,14 +225,14 @@ buf[0] = '\0'; if (ast_strlen_zero(parse)) { - ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n"); + ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n"); return -1; } AST_NONSTANDARD_APP_ARGS(args, parse, '/'); if (args.argc < 2) { - ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB(<family>/<key>)\n"); + ast_log(LOG_WARNING, "DB_EXISTS requires an argument, DB_EXISTS(<family>/<key>)\n"); return -1; } @@ -335,6 +360,106 @@ .write = function_db_delete_write, }; +static int function_db_shared_exists_read(struct ast_channel *chan, + const char *cmd, char *parse, char *buf, size_t len) +{ + AST_DECLARE_APP_ARGS(args, + AST_APP_ARG(family); + ); + + buf[0] = '\0'; + + if (ast_strlen_zero(parse)) { + ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n"); + return -1; + } + + AST_STANDARD_APP_ARGS(args, parse); + + if (args.argc != 1) { + ast_log(LOG_WARNING, "DB_SHARED_EXISTS requires an argument, DB_SHARED_EXISTS(<family>)\n"); + return -1; + } + + if (ast_db_is_shared(args.family)) { + ast_copy_string(buf, "1", len); + } else { + ast_copy_string(buf, "0", len); + } + pbx_builtin_setvar_helper(chan, "DB_RESULT", buf); + + return 0; +} + +static struct ast_custom_function db_shared_exists_function = { + .name = "DB_SHARED_EXISTS", + .read = function_db_shared_exists_read, +}; + +static int function_db_shared_write(struct ast_channel *chan, const char *cmd, char *parse, + const char *value) +{ + enum ast_db_shared_type share_type; + + AST_DECLARE_APP_ARGS(args, + AST_APP_ARG(action); + AST_APP_ARG(type); + ); + + if (ast_strlen_zero(parse)) { + ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n"); + return -1; + } + + AST_STANDARD_APP_ARGS(args, parse); + + if (args.argc < 1) { + ast_log(LOG_WARNING, "DB_SHARED requires an argument, DB_SHARED(<action>[,<type>])=<family>\n"); + return -1; + } + + if (ast_strlen_zero(value)) { + ast_log(LOG_WARNING, "DB_SHARED requires a value, DB_SHARED(<action>[,<type>])=<family>\n"); + return -1; + } + + if (!strcasecmp(args.action, "put")) { + if (ast_strlen_zero(args.type) || !strcasecmp(args.type, "global")) { + share_type = SHARED_DB_TYPE_GLOBAL; + } else if (!strcasecmp(args.type, "unique")) { + share_type = SHARED_DB_TYPE_UNIQUE; + } else { + ast_log(LOG_WARNING, "DB_SHARED: Unknown 'type' %s\n", args.type); + return -1; + } + + if (ast_db_put_shared(value, share_type)) { + /* Generally, failure is benign (key exists) */ + ast_debug(2, "Failed to create shared family '%s'\n", value); + } else { + ast_verb(4, "Created %s shared family '%s'", + share_type == SHARED_DB_TYPE_GLOBAL ? "GLOBAL" : "UNIQUE", + value); + } + } else if (!strcasecmp(args.action, "delete")) { + if (ast_db_del_shared(value)) { + /* Generally, failure is benign (key doesn't exist) */ + ast_debug(2, "Failed to delete shared family '%s'\n", value); + } else { + ast_verb(4, "Deleted shared family '%s'\n", value); + } + } else { + ast_log(LOG_WARNING, "DB_SHARED: Unknown 'action' %s\n", args.action); + } + + return 0; +} + +static struct ast_custom_function db_shared_function = { + .name = "DB_SHARED", + .write = function_db_shared_write, +}; + static int unload_module(void) { int res = 0; @@ -343,6 +468,8 @@ res |= ast_custom_function_unregister(&db_exists_function); res |= ast_custom_function_unregister(&db_delete_function); res |= ast_custom_function_unregister(&db_keys_function); + res |= ast_custom_function_unregister(&db_shared_function); + res |= ast_custom_function_unregister(&db_shared_exists_function); return res; } @@ -355,6 +482,8 @@ res |= ast_custom_function_register(&db_exists_function); res |= ast_custom_function_register_escalating(&db_delete_function, AST_CFE_READ); res |= ast_custom_function_register(&db_keys_function); + res |= ast_custom_function_register_escalating(&db_shared_function, AST_CFE_WRITE); + res |= ast_custom_function_register(&db_shared_exists_function); return res; } Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h (original) +++ team/mjordan/trunk-astdb-cluster/include/asterisk/astdb.h Mon Mar 9 09:13:38 2015 @@ -28,11 +28,73 @@ extern "C" { #endif +#include "asterisk/utils.h" + +enum ast_db_shared_type { + /* Items in the shared family are common across all Asterisk instances */ + SHARED_DB_TYPE_GLOBAL = 0, + /*! Items in the shared family are made unique across all Asterisk instances */ + SHARED_DB_TYPE_UNIQUE, +}; + struct ast_db_entry { struct ast_db_entry *next; char *key; char data[0]; }; + +struct stasis_topic; +struct stasis_message_type; + +struct ast_db_shared_family { + /*! How the family is shared */ + enum ast_db_shared_type share_type; + /*! Entries in the family, if appropriate */ + struct ast_db_entry *entries; + /*! The name of the shared family */ + char name[0]; +}; + +struct ast_db_entry *ast_db_entry_create(const char *key, const char *value); + +struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type); + +int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid); + +void ast_db_refresh_shared(void); + +/*! \addtogroup StasisTopicsAndMessages + * @{ + */ + +struct stasis_topic *ast_db_cluster_topic(void); + +/*! + * \since 14 + * \brief Message type for an RTCP message sent from this Asterisk instance + * + * \retval A stasis message type + */ +struct stasis_message_type *ast_db_put_shared_type(void); + +/*! + * \since 14 + * \brief Message type for an RTCP message received from some external source + * + * \retval A stasis message type + */ +struct stasis_message_type *ast_db_del_shared_type(void); + +/* }@ */ + +/*! + * \brief @@@@ + */ +int ast_db_put_shared(const char *family, enum ast_db_shared_type); + +int ast_db_del_shared(const char *family); + +int ast_db_is_shared(const char *family); /*! \brief Get key value specified by family/key */ int ast_db_get(const char *family, const char *key, char *value, int valuelen); Modified: team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h (original) +++ team/mjordan/trunk-astdb-cluster/include/asterisk/utils.h Mon Mar 9 09:13:38 2015 @@ -922,7 +922,7 @@ * \brief Convert an EID to a string * \since 1.6.1 */ -char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid); +char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid); /*! * \brief Convert a string into an EID Modified: team/mjordan/trunk-astdb-cluster/main/db.c URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/db.c?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/main/db.c (original) +++ team/mjordan/trunk-astdb-cluster/main/db.c Mon Mar 9 09:13:38 2015 @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 1999 - 2005, Digium, Inc. + * Copyright (C) 1999 - 2015, Digium, Inc. * * Mark Spencer <marks...@digium.com> * @@ -22,9 +22,6 @@ * * \author Mark Spencer <marks...@digium.com> * - * \note DB3 is licensed under Sleepycat Public License and is thus incompatible - * with GPL. To avoid having to make another exception (and complicate - * licensing even further) we elect to use DB1 which is BSD licensed */ /*** MODULEINFO @@ -45,11 +42,12 @@ #include <dirent.h> #include <sqlite3.h> -#include "asterisk/channel.h" #include "asterisk/file.h" +#include "asterisk/utils.h" +#include "asterisk/astdb.h" +#include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" #include "asterisk/app.h" -#include "asterisk/dsp.h" -#include "asterisk/astdb.h" #include "asterisk/cli.h" #include "asterisk/utils.h" #include "asterisk/manager.h" @@ -114,7 +112,16 @@ static int doexit; static int dosync; +/*! \brief A container of families to share across Asterisk instances */ +static struct ao2_container *shared_families; + +static struct stasis_topic *db_cluster_topic; + +static struct stasis_message_router *message_router; + static void db_sync(void); + +#define SHARED_FAMILY "__asterisk_shared_family" #define DEFINE_SQL_STATEMENT(stmt,sql) static sqlite3_stmt *stmt; \ const char stmt##_sql[] = sql; @@ -199,6 +206,76 @@ return res; } + +struct ast_db_entry *ast_db_entry_create(const char *key, const char *value) +{ + struct ast_db_entry *entry; + + entry = ast_malloc(sizeof(*entry) + strlen(key) + strlen(value) + 2); + if (!entry) { + return NULL; + } + entry->next = NULL; + entry->key = entry->data + strlen(value) + 1; + strcpy(entry->data, value); /* safe */ + strcpy(entry->key, key); /* safe */ + + return entry; +} + +static void shared_db_family_dtor(void *obj) +{ + struct ast_db_shared_family *family = obj; + + ast_db_freetree(family->entries); +} + +struct ast_db_shared_family *ast_db_shared_family_alloc(const char *family, enum ast_db_shared_type share_type) +{ + struct ast_db_shared_family *shared_family; + + shared_family = ao2_alloc_options(sizeof(*shared_family) + strlen(family) + 1, + shared_db_family_dtor, OBJ_NOLOCK); + if (!shared_family) { + return NULL; + } + strcpy(shared_family->name, family); /* safe */ + shared_family->share_type = share_type; + + return shared_family; +} + +static struct ast_db_shared_family *db_shared_family_clone(const struct ast_db_shared_family *shared_family) +{ + struct ast_db_shared_family *clone; + + clone = ast_db_shared_family_alloc(shared_family->name, shared_family->share_type); + + return clone; +} + +static int db_shared_family_sort_fn(const void *obj_left, const void *obj_right, int flags) +{ + const struct ast_db_shared_family *left = obj_left; + const struct ast_db_shared_family *right = obj_right; + const char *right_key = obj_right; + int cmp; + + switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) { + default: + case OBJ_POINTER: + right_key = right->name; + /* Fall through */ + case OBJ_KEY: + cmp = strcmp(left->name, right_key); + break; + case OBJ_PARTIAL_KEY: + cmp = strncmp(left->name, right_key, strlen(right_key)); + break; + } + return cmp; +} + static int db_create_astdb(void) { @@ -308,7 +385,152 @@ return db_execute_sql("ROLLBACK", NULL, NULL); } -int ast_db_put(const char *family, const char *key, const char *value) +static int db_put_common(const char *family, const char *key, const char *value, int share); + +int ast_db_put_shared(const char *family, enum ast_db_shared_type share_type) +{ + struct ast_db_shared_family *shared_family; + + if (ast_strlen_zero(family)) { + return -1; + } + + ao2_lock(shared_families); + + shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (shared_family) { + ao2_ref(shared_family, -1); + ao2_unlock(shared_families); + return -1; + } + + shared_family = ast_db_shared_family_alloc(family, share_type); + if (!shared_family) { + ao2_unlock(shared_families); + return -1; + } + + ao2_link_flags(shared_families, shared_family, OBJ_NOLOCK); + + db_put_common(SHARED_FAMILY, shared_family->name, + share_type == SHARED_DB_TYPE_UNIQUE ? "UNIQUE" : "GLOBAL", 0); + + ao2_ref(shared_family, -1); + + ao2_unlock(shared_families); + + return 0; +} + +int ast_db_is_shared(const char *family) +{ + struct ast_db_shared_family *shared_family; + int res = 0; + + shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY); + if (shared_family) { + res = 1; + ao2_ref(shared_family, -1); + } + + return res; +} + +static int db_put_shared(const char *family, const char *key, const char *value) +{ + struct ast_db_shared_family *shared_family; + struct ast_db_shared_family *clone; + + /* See if we are shared */ + shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY); + if (!shared_family) { + return 0; + } + + /* Create a Stasis message for the new item */ + clone = db_shared_family_clone(shared_family); + if (!clone) { + ao2_ref(shared_family, -1); + return -1; + } + clone->entries = ast_db_entry_create(key, value); + if (!clone->entries) { + ao2_ref(shared_family, -1); + ao2_ref(clone, -1); + return -1; + } + + /* Publish */ + ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL); + + ao2_ref(shared_family, -1); + + return 0; +} + +static int db_del_shared(const char *family, const char *key) +{ + struct ast_db_shared_family *shared_family; + struct ast_db_shared_family *clone; + + /* See if we are shared */ + shared_family = ao2_find(shared_families, family, OBJ_SEARCH_PARTIAL_KEY); + if (!shared_family) { + return 0; + } + + if (ast_strlen_zero(key)) { + clone = ao2_bump(shared_family); + } else { + clone = db_shared_family_clone(shared_family); + if (!clone) { + ao2_ref(shared_family, -1); + return -1; + } + clone->entries = ast_db_entry_create(key, ""); + if (!clone->entries) { + ao2_ref(shared_family, -1); + ao2_ref(clone, -1); + return -1; + } + } + + /* Publish */ + ast_db_publish_shared_message(ast_db_del_shared_type(), clone, NULL); + + ao2_ref(shared_family, -1); + + return 0; +} + +static int db_del_common(const char *family, const char *key, int share); + +int ast_db_del_shared(const char *family) +{ + struct ast_db_shared_family *shared_family; + int res = 0; + + if (ast_strlen_zero(family)) { + return -1; + } + + ao2_lock(shared_families); + + shared_family = ao2_find(shared_families, family, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (shared_family) { + ao2_unlink_flags(shared_families, shared_family, OBJ_NOLOCK); + db_del_common(SHARED_FAMILY, shared_family->name, 0); + ao2_ref(shared_family, -1); + } else { + res = -1; + } + + ao2_unlock(shared_families); + + return res; +} + +static int db_put_common(const char *family, const char *key, const char *value, int share) { char fullkey[MAX_DB_FIELD]; size_t fullkey_len; @@ -335,9 +557,17 @@ sqlite3_reset(put_stmt); db_sync(); + if (share) { + db_put_shared(family, key, value); + } ast_mutex_unlock(&dblock); return res; +} + +int ast_db_put(const char *family, const char *key, const char *value) +{ + return db_put_common(family, key, value, 1); } /*! @@ -410,7 +640,7 @@ return db_get_common(family, key, out, -1); } -int ast_db_del(const char *family, const char *key) +static int db_del_common(const char *family, const char *key, int share) { char fullkey[MAX_DB_FIELD]; size_t fullkey_len; @@ -433,12 +663,20 @@ } sqlite3_reset(del_stmt); db_sync(); + if (share) { + db_del_shared(family, key); + } ast_mutex_unlock(&dblock); - return res; -} - -int ast_db_deltree(const char *family, const char *keytree) + return res; +} + +int ast_db_del(const char *family, const char *key) +{ + return db_del_common(family, key, 1); +} + +static int db_deltree_common(const char *family, const char *keytree, int share) { sqlite3_stmt *stmt = deltree_stmt; char prefix[MAX_DB_FIELD]; @@ -468,9 +706,17 @@ res = sqlite3_changes(astdb); sqlite3_reset(stmt); db_sync(); + if (share) { + db_del_shared(prefix, NULL); + } ast_mutex_unlock(&dblock); - return res; + return res; +} + +int ast_db_deltree(const char *family, const char *keytree) +{ + return db_deltree_common(family, keytree, 1); } struct ast_db_entry *ast_db_gettree(const char *family, const char *keytree) @@ -508,13 +754,10 @@ if (!(value_s = (const char *) sqlite3_column_text(stmt, 1))) { break; } - if (!(cur = ast_malloc(sizeof(*cur) + strlen(key_s) + strlen(value_s) + 2))) { + cur = ast_db_entry_create(key_s, value_s); + if (!cur) { break; } - cur->next = NULL; - cur->key = cur->data + strlen(value_s) + 1; - strcpy(cur->data, value_s); - strcpy(cur->key, key_s); if (last) { last->next = cur; } else { @@ -748,14 +991,26 @@ while (sqlite3_step(showkey_stmt) == SQLITE_ROW) { const char *key_s, *value_s; + char *family_s; + char *delim; + if (!(key_s = (const char *) sqlite3_column_text(showkey_stmt, 0))) { break; } if (!(value_s = (const char *) sqlite3_column_text(showkey_stmt, 1))) { break; } + family_s = ast_strdup(key_s); + if (!family_s) { + break; + } + delim = strchr(family_s + 1, '/'); + *delim = '\0'; + ++counter; - ast_cli(a->fd, "%-50s: %-25s\n", key_s, value_s); + ast_cli(a->fd, "%-50s: %-25s %s\n", key_s, value_s, + ast_db_is_shared(family_s + 1) ? "(S)" : ""); + ast_free(family_s); } sqlite3_reset(showkey_stmt); ast_mutex_unlock(&dblock); @@ -984,6 +1239,197 @@ return NULL; } +int ast_db_publish_shared_message(struct stasis_message_type *type, struct ast_db_shared_family *shared_family, struct ast_eid *eid) +{ + struct stasis_message *message; + + /* Aggregate doesn't really apply to the AstDB; as such, if we aren't + * provided an EID use our own. + */ + if (!eid) { + eid = &ast_eid_default; + } + + message = stasis_message_create_full(type, shared_family, eid); + if (!message) { + return -1; + } + + stasis_publish(ast_db_cluster_topic(), message); + + return 0; +} + +void ast_db_refresh_shared(void) +{ + struct ao2_iterator it_shared_families; + struct ast_db_shared_family *shared_family; + + it_shared_families = ao2_iterator_init(shared_families, 0); + while ((shared_family = ao2_iterator_next(&it_shared_families))) { + struct ast_db_shared_family *clone; + + clone = db_shared_family_clone(shared_family); + if (!clone) { + ao2_ref(shared_family, -1); + continue; + } + + clone->entries = ast_db_gettree(shared_family->name, ""); + if (!clone->entries) { + ao2_ref(clone, -1); + ao2_ref(shared_family, -1); + continue; + } + + ast_db_publish_shared_message(ast_db_put_shared_type(), clone, NULL); + + ao2_ref(clone, -1); + ao2_ref(shared_family, -1); + } + ao2_iterator_destroy(&it_shared_families); +} + +static struct ast_event *db_del_shared_type_to_event(struct stasis_message *message) +{ + return NULL; +} + +static struct ast_json *db_entries_to_json(struct ast_db_entry *entry) +{ + struct ast_json *json; + struct ast_db_entry *cur; + + json = ast_json_array_create(); + if (!json) { + return NULL; + } + + for (cur = entry; cur; cur = cur->next) { + struct ast_json *json_entry; + + json_entry = ast_json_pack("{s: s, s: s}", + "key", cur->key, + "data", cur->data); + if (!json_entry) { + ast_json_unref(json); + return NULL; + } + + if (ast_json_array_append(json, json_entry)) { + ast_json_unref(json); + return NULL; + } + } + + return json; +} + +static struct ast_json *db_shared_family_to_json(struct stasis_message *message, + const struct stasis_message_sanitizer *sanitize) +{ + struct stasis_message_type *type = stasis_message_type(message); + struct ast_db_shared_family *shared_family; + + shared_family = stasis_message_data(message); + if (!shared_family) { + return NULL; + } + + return ast_json_pack("{s: s, s: s, s: s, s: o}", + "verb", type == ast_db_put_shared_type() ? "put" : "delete", + "family", shared_family->name, + "share_type", shared_family->share_type == SHARED_DB_TYPE_UNIQUE ? "unique" : "global", + "entries", shared_family->entries ? db_entries_to_json(shared_family->entries) : ast_json_null()); +} + +static struct ast_event *db_put_shared_type_to_event(struct stasis_message *message) +{ + return NULL; +} + +struct stasis_topic *ast_db_cluster_topic(void) +{ + return db_cluster_topic; +} + +STASIS_MESSAGE_TYPE_DEFN(ast_db_put_shared_type, + .to_event = db_put_shared_type_to_event, + .to_json = db_shared_family_to_json, + ); +STASIS_MESSAGE_TYPE_DEFN(ast_db_del_shared_type, + .to_event = db_del_shared_type_to_event, + .to_json = db_shared_family_to_json, + ); + +static void db_put_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct ast_db_shared_family *shared_family; + struct ast_db_shared_family *shared_check; + struct ast_db_entry *cur; + const struct ast_eid *eid; + char *family_id; + + shared_family = stasis_message_data(message); + if (!shared_family) { + return; + } + + eid = stasis_message_eid(message); + if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) { + return; + } + + /* Don't update if we don't have this area shared on this server */ + shared_check = ao2_find(shared_families, shared_family->name, OBJ_KEY); + if (!shared_check) { + return; + } + ao2_ref(shared_check, -1); + + if (shared_family->share_type == SHARED_DB_TYPE_UNIQUE) { + char eid_workspace[20]; + + /* Length is family + '/' + EID length (20) + 1 */ + family_id = ast_alloca(strlen(shared_family->name) + 22); + ast_eid_to_str(eid_workspace, sizeof(eid_workspace), eid); + sprintf(family_id, "%s/%s", eid_workspace, shared_family->name); /* safe */ + } else { + family_id = shared_family->name; + } + + for (cur = shared_family->entries; cur; cur = cur->next) { + db_put_common(family_id, cur->key, cur->data, 0); + } +} + +static void db_del_shared_msg_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct ast_db_shared_family *shared_family; + struct ast_db_entry *cur; + const struct ast_eid *eid; + + shared_family = stasis_message_data(message); + if (!shared_family) { + return; + } + + eid = stasis_message_eid(message); + if (!eid || !ast_eid_cmp(eid, &ast_eid_default)) { + return; + } + + cur = shared_family->entries; + if (!cur) { + db_deltree_common(shared_family->name, NULL, 0); + return; + } + + for (; cur; cur = cur->next) { + db_del_common(shared_family->name, cur->key, 0); + } +} + /*! * \internal * \brief Clean up resources on Asterisk shutdown @@ -995,6 +1441,11 @@ ast_manager_unregister("DBPut"); ast_manager_unregister("DBDel"); ast_manager_unregister("DBDelTree"); + + ao2_cleanup(db_cluster_topic); + db_cluster_topic = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_db_put_shared_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_db_del_shared_type); /* Set doexit to 1 to kill thread. db_sync must be called with * mutex held. */ @@ -1005,6 +1456,10 @@ pthread_join(syncthread, NULL); ast_mutex_lock(&dblock); + + ao2_ref(shared_families, -1); + shared_families = NULL; + clean_statements(); if (sqlite3_close(astdb) == SQLITE_OK) { astdb = NULL; @@ -1014,12 +1469,43 @@ int astdb_init(void) { + shared_families = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, + AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT | AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, + db_shared_family_sort_fn, NULL); + if (!shared_families) { + return -1; + } + + db_cluster_topic = stasis_topic_create("ast_db_cluster_topic"); + if (!db_cluster_topic) { + ao2_ref(shared_families, -1); + return -1; + } + + STASIS_MESSAGE_TYPE_INIT(ast_db_put_shared_type); + STASIS_MESSAGE_TYPE_INIT(ast_db_del_shared_type); + + message_router = stasis_message_router_create_pool(ast_db_cluster_topic()); + if (!message_router) { + ao2_ref(db_cluster_topic, -1); + ao2_ref(shared_families, -1); + return -1; + } + stasis_message_router_add(message_router, ast_db_put_shared_type(), + db_put_shared_msg_cb, NULL); + stasis_message_router_add(message_router, ast_db_del_shared_type(), + db_del_shared_msg_cb, NULL); + if (db_init()) { + ao2_ref(db_cluster_topic, -1); + ao2_ref(shared_families, -1); return -1; } ast_cond_init(&dbcond, NULL); if (ast_pthread_create_background(&syncthread, NULL, db_sync_thread, NULL)) { + ao2_ref(db_cluster_topic, -1); + ao2_ref(shared_families, -1); return -1; } Modified: team/mjordan/trunk-astdb-cluster/main/utils.c URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/main/utils.c?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/main/utils.c (original) +++ team/mjordan/trunk-astdb-cluster/main/utils.c Mon Mar 9 09:13:38 2015 @@ -2691,7 +2691,7 @@ } #endif /* defined(AST_DEVMODE) */ -char *ast_eid_to_str(char *s, int maxlen, struct ast_eid *eid) +char *ast_eid_to_str(char *s, int maxlen, const struct ast_eid *eid) { int x; char *os = s; Modified: team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c URL: http://svnview.digium.com/svn/asterisk/team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c?view=diff&rev=432635&r1=432634&r2=432635 ============================================================================== --- team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c (original) +++ team/mjordan/trunk-astdb-cluster/res/res_pjsip_publish_asterisk.c Mon Mar 9 09:13:38 2015 @@ -36,6 +36,7 @@ #include "asterisk/module.h" #include "asterisk/logger.h" #include "asterisk/app.h" +#include "asterisk/astdb.h" /*** DOCUMENTATION <configInfo name="res_pjsip_publish_asterisk" language="en_US"> @@ -58,6 +59,9 @@ <configOption name="mailboxstate_publish"> <synopsis>Optional name of a publish item that can be used to publish a request for full mailbox state information.</synopsis> </configOption> + <configOption name="dbstate_publish"> + <synopsis>Optional name of a publish item that can be used to publish a request for full AstDB state information.</synopsis> + </configOption> <configOption name="device_state" default="no"> <synopsis>Whether we should permit incoming device state events.</synopsis> </configOption> @@ -70,6 +74,12 @@ <configOption name="mailbox_state_filter"> <synopsis>Optional regular expression used to filter what mailboxes we accept events for.</synopsis> </configOption> + <configOption name="db_state" default="no"> + <synopsis>Whether we should permit incoming AstDB state events.</synopsis> + </configOption> + <configOption name="db_state_filter"> + <synopsis>Optional regular expression used to filter what AstDB families we accept events for.</synopsis> + </configOption> <configOption name="type"> <synopsis>Must be of type 'asterisk-publication'.</synopsis> </configOption> @@ -102,6 +112,18 @@ unsigned int mailbox_state_filter; }; +/*! \brief Structure which contains Asterisk AstDB publisher state information */ +struct asterisk_db_publisher_state { + /*! \brief The publish client to send PUBLISH messages on */ + struct ast_sip_outbound_publish_client *client; + /*! \brief AstDB subscription */ + struct stasis_subscription *db_state_subscription; + /*! \brief Regex used for filtering outbound db families */ + regex_t db_state_regex; + /*! \brief AstDB families should be filtered */ + unsigned int db_state_filter; +}; + /*! \brief Structure which contains Asterisk publication information */ struct asterisk_publication_config { /*! \brief Sorcery object details */ @@ -112,6 +134,8 @@ AST_STRING_FIELD(devicestate_publish); /*! \brief Optional name of a mailbox state publish item, used to request the remote side update us */ AST_STRING_FIELD(mailboxstate_publish); + /*! \brief Optional name of an AstDB publish item, used to request the remote side update us */ + AST_STRING_FIELD(dbstate_publish); ); /*! \brief Accept inbound device state events */ unsigned int device_state; @@ -125,6 +149,12 @@ regex_t mailbox_state_regex; /*! \brief Mailbox state should be filtered */ unsigned int mailbox_state_filter; + /*! \brief Accept inbound AstDB state events */ + unsigned int db_state; + /*! \brief Regex used for filtering inbound AstDB state */ + regex_t db_state_regex; + /*! \brief AstDB state should be filtered */ + unsigned int db_state_filter; }; /*! \brief Destroy callback for Asterisk devicestate publisher state information from datastore */ @@ -281,6 +311,78 @@ ast_json_unref(json); } +/*! + * \brief Callback function for db state events + * \param ast_event + * \param data void pointer to ast_client structure + * \return void + */ +static void asterisk_publisher_dbstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg) +{ + struct ast_datastore *datastore = data; + struct asterisk_db_publisher_state *publisher_state = datastore->data; + struct ast_json *json_db; + struct ast_json *json; + const struct ast_eid *eid; + char eid_str[20]; + struct ast_db_shared_family *shared_family; + char *text; + struct ast_sip_body body = { + .type = "application", + .subtype = "json", + }; + + if (!stasis_subscription_is_subscribed(sub)) { + return; + } + + eid = stasis_message_eid(msg); + if (!eid || ast_eid_cmp(&ast_eid_default, eid)) { + /* If the event is aggregate, unknown, or didn't originate from this + * server, don't send it out. */ + return; + } + + shared_family = stasis_message_data(msg); + if (!shared_family) { + return; + } + + if (publisher_state->db_state_filter && regexec(&publisher_state->db_state_regex, shared_family->name, 0, NULL, 0)) { + /* Outgoing AstDB state is filtered and the family wasn't allowed */ + return; + } + + json_db = stasis_message_to_json(msg, NULL); + if (!json_db) { + return; + } + + + ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default); + json = ast_json_pack( + "{ s: s, s: s, s: o }", + "type", "dbstate", + "eid", eid_str, + "dbstate", json_db); + if (!json) { + ast_json_unref(json_db); + return; + } + + text = ast_json_dump_string(json); + if (!text) { + ast_json_unref(json); + return; + } + body.body_text = text; + + ast_sip_publish_client_send(publisher_state->client, &body); + + ast_json_free(text); + ast_json_unref(json); +} + static int cached_devstate_cb(void *obj, void *arg, int flags) { struct stasis_message *msg = obj; @@ -469,6 +571,76 @@ .stop_publishing = asterisk_stop_mwi_publishing, }; +static int asterisk_start_db_publishing(struct ast_sip_outbound_publish *configuration, + struct ast_sip_outbound_publish_client *client) +{ + RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup); + struct asterisk_db_publisher_state *publisher_state; + const char *value; + + datastore = ast_sip_publish_client_alloc_datastore(&asterisk_mwi_publisher_state_datastore, "asterisk-db-publisher"); + if (!datastore) { + return -1; + } + + publisher_state = ast_calloc(1, sizeof(*publisher_state)); + if (!publisher_state) { + return -1; + } + datastore->data = publisher_state; + + value = ast_sorcery_object_get_extended(configuration, "db_state_filter"); + if (!ast_strlen_zero(value)) { + if (build_regex(&publisher_state->db_state_regex, value)) { + return -1; + } + publisher_state->db_state_filter = 1; + } + + publisher_state->client = ao2_bump(client); + + if (ast_sip_publish_client_add_datastore(client, datastore)) { + return -1; + } + + publisher_state->db_state_subscription = stasis_subscribe(ast_db_cluster_topic(), + asterisk_publisher_dbstate_cb, ao2_bump(datastore)); + if (!publisher_state->db_state_subscription) { + ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher"); + ao2_ref(datastore, -1); + return -1; + } + + return 0; +} + +static int asterisk_stop_db_publishing(struct ast_sip_outbound_publish_client *client) +{ + RAII_VAR(struct ast_datastore *, datastore, ast_sip_publish_client_get_datastore(client, "asterisk-db-publisher"), + ao2_cleanup); + struct asterisk_db_publisher_state *publisher_state; + + if (!datastore) { + return 0; + } + + publisher_state = datastore->data; + if (publisher_state->db_state_subscription) { + stasis_unsubscribe_and_join(publisher_state->db_state_subscription); + ao2_ref(datastore, -1); + } + + ast_sip_publish_client_remove_datastore(client, "asterisk-db-publisher"); + + return 0; +} + +struct ast_sip_event_publisher_handler asterisk_db_publisher_handler = { + .event_name = "asterisk-db", + .start_publishing = asterisk_start_db_publishing, + .stop_publishing = asterisk_stop_db_publishing, +}; + static int asterisk_publication_new(struct ast_sip_endpoint *endpoint, const char *resource, const char *event_configuration) { RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication", @@ -545,6 +717,114 @@ mailbox = strsep(&item_id, "@"); ast_publish_mwi_state_full(mailbox, item_id, new_msgs, old_msgs, NULL, pubsub_eid); + + return 0; +} + +static int asterisk_publication_dbstate(struct ast_sip_publication *pub, struct asterisk_publication_config *config, + struct ast_eid *pubsub_eid, struct ast_json *json) +{ + struct ast_json *json_db = ast_json_object_get(json, "dbstate"); + struct ast_json *json_entries; + struct stasis_message_type *type; + struct ast_db_shared_family *shared_family; + struct ast_db_entry *entry = NULL; + struct ast_db_entry *cur = NULL; + enum ast_db_shared_type share_type; + const char *family; + const char *verb; + const char *str_share_type; + int i; + + if (!json_db) { + ast_debug(2, "Received AstDB state event with no 'dbstate' body\n"); + return 0; + } + + if (!config->db_state) { + ast_debug(2, "Received AstDB state event for resource '%s' but it is not configured to accept them\n", + ast_sorcery_object_get_id(config)); + return 0; + } + + family = ast_json_string_get(ast_json_object_get(json_db, "family")); + if (ast_strlen_zero(family)) { + ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'family'\n", + ast_sorcery_object_get_id(config)); + return -1; + } + + verb = ast_json_string_get(ast_json_object_get(json_db, "verb")); + if (ast_strlen_zero(verb)) { + ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'verb'\n", + ast_sorcery_object_get_id(config)); + return -1; + } else if (!strcasecmp(verb, "put")) { + type = ast_db_put_shared_type(); + } else if (!strcasecmp(verb, "delete")) { + type = ast_db_del_shared_type(); + } else { + ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n", + ast_sorcery_object_get_id(config), verb); + return -1; + } + + str_share_type = ast_json_string_get(ast_json_object_get(json_db, "share_type")); + if (ast_strlen_zero(str_share_type)) { + ast_debug(1, "Received incomplete AstDB state event for resource '%s': missing 'share_type'\n", + ast_sorcery_object_get_id(config)); + return -1; + } else if (!strcasecmp(str_share_type, "global")) { + share_type = SHARED_DB_TYPE_GLOBAL; + } else if (!strcasecmp(str_share_type, "unique")) { + share_type = SHARED_DB_TYPE_UNIQUE; + } else { + ast_debug(1, "Received bad AstDB state event for resource '%s': unknown verb '%s'\n", + ast_sorcery_object_get_id(config), str_share_type); + return -1; + } + + json_entries = ast_json_object_get(json_db, "entries"); + for (i = 0; i < ast_json_array_size(json_entries); i++) { + struct ast_db_entry *temp; + struct ast_json *json_entry; + const char *key; + const char *data; + + json_entry = ast_json_array_get(json_entries, i); + if (!json_entry) { + continue; + } + key = ast_json_string_get(ast_json_object_get(json_entry, "key")); + data = ast_json_string_get(ast_json_object_get(json_entry, "data")); + + if (ast_strlen_zero(key) || !data) { + continue; + } + + temp = ast_db_entry_create(key, data); + if (!temp) { + ast_db_freetree(entry); + return -1; + } + + if (cur) { + cur->next = temp; + cur = temp; + } else { + entry = cur = temp; + } + } + + shared_family = ast_db_shared_family_alloc(family, share_type); + if (!shared_family) { + ast_db_freetree(entry); + return -1; + } + shared_family->entries = entry; + + ast_db_publish_shared_message(type, shared_family, pubsub_eid); + ao2_ref(shared_family, -1); return 0; } @@ -733,6 +1013,75 @@ return res; } +static int asterisk_publication_db_refresh(struct ast_sip_publication *pub, + struct asterisk_publication_config *config, struct ast_eid *pubsub_eid, struct ast_json *json) +{ + if (ast_strlen_zero(config->dbstate_publish)) { + return 0; + } + + ast_db_refresh_shared(); + + return 0; +} + +static int asterisk_publication_db_state_change(struct ast_sip_publication *pub, pjsip_msg_body *body, + enum ast_sip_publish_state state) +{ + RAII_VAR(struct asterisk_publication_config *, config, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "asterisk-publication", + ast_sip_publication_get_event_configuration(pub)), ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + const char *eid, *type; + struct ast_eid pubsub_eid; + int res = -1; + + /* If no configuration exists for this publication it has most likely been removed, so drop this immediately */ + if (!config) { + return -1; + } + + /* If no body exists this is a refresh and can be ignored */ + if (!body) { + return 0; + } + [... 780 lines stripped ...] -- _____________________________________________________________________ -- Bandwidth and Colocation Provided by http://www.api-digital.com -- svn-commits mailing list To UNSUBSCRIBE or update options visit: http://lists.digium.com/mailman/listinfo/svn-commits