Author: dlee Date: Wed Jan 21 13:59:24 2015 New Revision: 430900 URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=430900 Log: First cut at AMQP backend for CDR/CEL
Added: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c (with props) team/dlee/amqp-cdr-cel/cel/cel_amqp.c (with props) team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample (with props) team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample (with props) team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample (with props) team/dlee/amqp-cdr-cel/include/asterisk/amqp.h (with props) team/dlee/amqp-cdr-cel/res/amqp/ team/dlee/amqp-cdr-cel/res/amqp/cli.c (with props) team/dlee/amqp-cdr-cel/res/amqp/config.c (with props) team/dlee/amqp-cdr-cel/res/amqp/internal.h (with props) team/dlee/amqp-cdr-cel/res/res_amqp.c (with props) team/dlee/amqp-cdr-cel/res/res_amqp.exports.in (with props) Modified: team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in team/dlee/amqp-cdr-cel/configure team/dlee/amqp-cdr-cel/configure.ac team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in team/dlee/amqp-cdr-cel/include/asterisk/cel.h team/dlee/amqp-cdr-cel/makeopts.in team/dlee/amqp-cdr-cel/res/Makefile team/dlee/amqp-cdr-cel/res/res_pjsip_config_wizard.c Modified: team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in (original) +++ team/dlee/amqp-cdr-cel/build_tools/menuselect-deps.in Wed Jan 21 13:59:24 2015 @@ -51,6 +51,7 @@ PRI=@PBX_PRI@ OPENR2=@PBX_OPENR2@ RESAMPLE=@PBX_RESAMPLE@ +RABBITMQ=@PBX_RABBITMQ@ RADIUS=@PBX_RADIUS@ LAUNCHD=@PBX_LAUNCHD@ SPANDSP=@PBX_SPANDSP@ Added: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c (added) +++ team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c Wed Jan 21 13:59:24 2015 @@ -1,0 +1,385 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2015, Digium, Inc. + * + * David M. Lee, II <d...@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief AMQP CDR Backend + * + * \author David M. Lee, II <d...@digium.com> + */ + +/*** MODULEINFO + <depend>res_amqp</depend> + <support_level>core</support_level> + ***/ + +/*** DOCUMENTATION + <configInfo name="cdr_amqp" language="en_US"> + <synopsis>AMQP CDR Backend</synopsis> + <configFile name="cdr_amqp.conf"> + <configObject name="global"> + <synopsis>Global configuration settings</synopsis> + <configOption name="loguniqueid"> + <synopsis></synopsis> + <description> + <para></para> + </description> + </configOption> + <configOption name="loguniqueid"> + <synopsis>Determines whether to log the uniqueid for calls</synopsis> + <description> + <para>Defaults is no.</para> + </description> + </configOption> + <configOption name="loguserfield"> + <synopsis>Determines whether to log the user field for calls</synopsis> + <description> + <para>Default is no.</para> + </description> + </configOption> + <configOption name="connection"> + <synopsis>Name of the connection from amqp.conf to use</synopsis> + <description> + <para>Specifies the name of the connection from amqp.conf to use</para> + </description> + </configOption> + <configOption name="queue"> + <synopsis>Name of the queue to post to</synopsis> + <description> + <para>Defaults to asterisk_cdr</para> + </description> + </configOption> + <configOption name="exchange"> + <synopsis>Name of the exchange to post to</synopsis> + <description> + <para>Defaults to empty string</para> + </description> + </configOption> + </configObject> + </configFile> + </configInfo> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/cdr.h" +#include "asterisk/config_options.h" +#include "asterisk/json.h" +#include "asterisk/module.h" +#include "asterisk/amqp.h" +#include "asterisk/stringfields.h" + +#define CDR_NAME "AMQP" +#define CONF_FILENAME "cdr_amqp.conf" + +/*! \brief global config structure */ +struct cdr_amqp_global_conf { + AST_DECLARE_STRING_FIELDS( + /*! \brief connection name */ + AST_STRING_FIELD(connection); + /*! \brief queue name */ + AST_STRING_FIELD(queue); + /*! \brief exchange name */ + AST_STRING_FIELD(exchange); + ); + /*! \brief whether to log the unique id */ + int loguniqueid; + /*! \brief whether to log the user field */ + int loguserfield; + + /*! \brief current connection to amqp */ + struct ast_amqp_connection *amqp; +}; + +/*! \brief cdr_amqp configuration */ +struct cdr_amqp_conf { + struct cdr_amqp_global_conf *global; +}; + +/*! \brief Locking container for safe configuration access. */ +static AO2_GLOBAL_OBJ_STATIC(confs); + +static struct aco_type global_option = { + .type = ACO_GLOBAL, + .name = "global", + .item_offset = offsetof(struct cdr_amqp_conf, global), + .category = "^global$", + .category_match = ACO_WHITELIST, +}; + +static struct aco_type *global_options[] = ACO_TYPES(&global_option); + +static void conf_global_dtor(void *obj) +{ + struct cdr_amqp_global_conf *global = obj; + ao2_cleanup(global->amqp); + ast_string_field_free_memory(global); +} + +static struct cdr_amqp_global_conf *conf_global_create(void) +{ + RAII_VAR(struct cdr_amqp_global_conf *, global, NULL, ao2_cleanup); + + global = ao2_alloc(sizeof(*global), conf_global_dtor); + if (!global) { + return NULL; + } + + if (ast_string_field_init(global, 64) != 0) { + return NULL; + } + + aco_set_defaults(&global_option, "global", global); + + return ao2_bump(global); +} + +/*! \brief The conf file that's processed for the module. */ +static struct aco_file conf_file = { + /*! The config file name. */ + .filename = CONF_FILENAME, + /*! The mapping object types to be processed. */ + .types = ACO_TYPES(&global_option), +}; + +static void conf_dtor(void *obj) +{ + struct cdr_amqp_conf *conf = obj; + + ao2_cleanup(conf->global); +} + +static void *conf_alloc(void) +{ + RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup); + + conf = ao2_alloc_options(sizeof(*conf), conf_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!conf) { + return NULL; + } + + conf->global = conf_global_create(); + if (!conf->global) { + return NULL; + } + + return ao2_bump(conf); +} + +CONFIG_INFO_STANDARD(cfg_info, confs, conf_alloc, + .files = ACO_FILES(&conf_file)); + +/*! + * \brief CDR handler for AMQP. + * + * \param cdr CDR to log. + * \return 0 on success. + * \return -1 on error. + */ +static int amqp_cdr_log(struct ast_cdr *cdr) +{ + RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + RAII_VAR(char *, str, NULL, ast_json_free); + RAII_VAR(struct ast_json *, disposition, NULL, ast_json_unref); + int res; + amqp_basic_properties_t props = { + ._flags = AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG, + .delivery_mode = 2, /* persistent delivery mode */ + .content_type = amqp_cstring_bytes("application/json") + }; + + conf = ao2_global_obj_ref(confs); + if (!conf || !conf->global) { + ast_log(LOG_ERROR, "Error obtaining config from cdr_amqp.conf\n"); + return -1; + } + + if (!conf->global->amqp) { + ast_log(LOG_ERROR, "No AMQP connection; discarding CDR\n"); + return -1; + } + + json = ast_json_pack("{" + /* clid, src, dst, dcontext */ + "s: s, s: s, s: s, s: s," + /* channel, dstchannel, lastapp, lastdata */ + "s: s, s: s, s: s, s: s," + /* start, answer, end, duration */ + "s: o, s: o, s: o, s: i" + /* billsec, disposition, accountcode, amaflags */ + "s: i, s: s, s: s, s: s" + /* peeraccount, linkedid */ + "s: s, s: s }", + + "clid", cdr->clid, + "src", cdr->src, + "dst", cdr->dst, + "dcontext", cdr->dcontext, + + "channel", cdr->channel, + "dstchannel", cdr->dstchannel, + "lastapp", cdr->lastapp, + "lastdata", cdr->lastdata, + + "start", ast_json_timeval(cdr->start, NULL), + "answer", ast_json_timeval(cdr->answer, NULL), + "end", ast_json_timeval(cdr->end, NULL), + "durationsec", cdr->duration, + + "billsec", cdr->billsec, + "disposition", ast_cdr_disp2str(cdr->disposition), + "accountcode", cdr->accountcode, + "amaflags", ast_channel_amaflags2string(cdr->amaflags), + + "peeraccount", cdr->peeraccount, + "linkedid", cdr->linkedid); + if (!json) { + return -1; + } + + /* Set optional fields */ + if (conf->global->loguniqueid) { + ast_json_object_set(json, + "uniqueid", ast_json_string_create(cdr->uniqueid)); + } + + if (conf->global->loguserfield) { + ast_json_object_set(json, + "userfield", ast_json_string_create(cdr->userfield)); + } + + /* Dump the JSON to a string for publication */ + str = ast_json_dump_string(json); + if (!str) { + return -1; + } + + res = ast_amqp_basic_publish(conf->global->amqp, + amqp_cstring_bytes(conf->global->exchange), + amqp_cstring_bytes(conf->global->queue), + 0, /* mandatory; don't return unsendable messages */ + 0, /* immediate; allow messages to be queued */ + &props, + amqp_cstring_bytes(str)); + + if (res != 0) { + ast_log(LOG_ERROR, "Error publishing CDR to AMQP\n"); + return -1; + } + + return 0; +} + + +static int load_config(int reload) +{ + RAII_VAR(struct cdr_amqp_conf *, conf, NULL, ao2_cleanup); + RAII_VAR(struct ast_amqp_connection *, amqp, NULL, ao2_cleanup); + + switch (aco_process_config(&cfg_info, reload)) { + case ACO_PROCESS_ERROR: + return -1; + case ACO_PROCESS_OK: + case ACO_PROCESS_UNCHANGED: + break; + } + + conf = ao2_global_obj_ref(confs); + if (!conf || !conf->global) { + ast_log(LOG_ERROR, "Error obtaining config from cdr_amqp.conf\n"); + return -1; + } + + /* Refresh the AMQP connection */ + ao2_cleanup(conf->global->amqp); + conf->global->amqp = ast_amqp_get_connection(conf->global->connection); + + if (!conf->global->amqp) { + ast_log(LOG_ERROR, "Could not get AMQP connection %s\n", + conf->global->connection); + return -1; + } + + return 0; +} + +static int load_module(void) +{ + if (aco_info_init(&cfg_info) != 0) { + ast_log(LOG_ERROR, "Failed to initialize config"); + aco_info_destroy(&cfg_info); + return -1; + } + + aco_option_register(&cfg_info, "loguniqueid", ACO_EXACT, + global_options, "no", OPT_BOOL_T, 1, + FLDSET(struct cdr_amqp_global_conf, loguniqueid)); + aco_option_register(&cfg_info, "loguserfield", ACO_EXACT, + global_options, "no", OPT_BOOL_T, 1, + FLDSET(struct cdr_amqp_global_conf, loguserfield)); + aco_option_register(&cfg_info, "connection", ACO_EXACT, + global_options, "", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cdr_amqp_global_conf, connection)); + aco_option_register(&cfg_info, "queue", ACO_EXACT, + global_options, "asterisk_cdr", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cdr_amqp_global_conf, queue)); + aco_option_register(&cfg_info, "exchange", ACO_EXACT, + global_options, "", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cdr_amqp_global_conf, exchange)); + + if (load_config(0) != 0) { + ast_log(LOG_WARNING, "Configuration failed to load\n"); + return AST_MODULE_LOAD_DECLINE; + } + + if (ast_cdr_register(CDR_NAME, ast_module_info->description, amqp_cdr_log) != 0) { + ast_log(LOG_ERROR, "Could not register CDR backend\n"); + return AST_MODULE_LOAD_FAILURE; + } + + ast_log(LOG_NOTICE, "CDR AMQP logging enabled\n"); + return AST_MODULE_LOAD_SUCCESS; +} + +static int unload_module(void) +{ + aco_info_destroy(&cfg_info); + ao2_global_obj_release(confs); + if (ast_cdr_unregister(CDR_NAME) != 0) { + return -1; + } + + return 0; +} + +static int reload_module(void) +{ + return load_config(1); +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "AMQP CDR Backend", + .support_level = AST_MODULE_SUPPORT_CORE, + .load = load_module, + .unload = unload_module, + .reload = reload_module, + .load_pri = AST_MODPRI_CDR_DRIVER, + ); Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/cdr/cdr_amqp.c ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: team/dlee/amqp-cdr-cel/cel/cel_amqp.c URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/cel/cel_amqp.c?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/cel/cel_amqp.c (added) +++ team/dlee/amqp-cdr-cel/cel/cel_amqp.c Wed Jan 21 13:59:24 2015 @@ -1,0 +1,379 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2015, Digium, Inc. + * + * David M. Lee, II <d...@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief AMQP CEL Backend + * + * \author David M. Lee, II <d...@digium.com> + */ + +/*** MODULEINFO + <depend>res_amqp</depend> + <support_level>core</support_level> + ***/ + +/*** DOCUMENTATION + <configInfo name="cel_amqp" language="en_US"> + <synopsis>AMQP CEL Backend</synopsis> + <configFile name="cel_amqp.conf"> + <configObject name="global"> + <synopsis>Global configuration settings</synopsis> + <configOption name="loguniqueid"> + <synopsis></synopsis> + <description> + <para></para> + </description> + </configOption> + <configOption name="connection"> + <synopsis>Name of the connection from amqp.conf to use</synopsis> + <description> + <para>Specifies the name of the connection from amqp.conf to use</para> + </description> + </configOption> + <configOption name="queue"> + <synopsis>Name of the queue to post to</synopsis> + <description> + <para>Defaults to asterisk_cel</para> + </description> + </configOption> + <configOption name="exchange"> + <synopsis>Name of the exchange to post to</synopsis> + <description> + <para>Defaults to empty string</para> + </description> + </configOption> + </configObject> + </configFile> + </configInfo> + ***/ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/cel.h" +#include "asterisk/channel.h" +#include "asterisk/config_options.h" +#include "asterisk/json.h" +#include "asterisk/module.h" +#include "asterisk/amqp.h" +#include "asterisk/stringfields.h" + +#define CEL_NAME "AMQP" +#define CONF_FILENAME "cel_amqp.conf" + +/*! \brief global config structure */ +struct cel_amqp_global_conf { + AST_DECLARE_STRING_FIELDS( + /*! \brief connection name */ + AST_STRING_FIELD(connection); + /*! \brief queue name */ + AST_STRING_FIELD(queue); + /*! \brief exchange name */ + AST_STRING_FIELD(exchange); + ); + + /*! \brief current connection to amqp */ + struct ast_amqp_connection *amqp; +}; + +/*! \brief cel_amqp configuration */ +struct cel_amqp_conf { + struct cel_amqp_global_conf *global; +}; + +/*! \brief Locking container for safe configuration access. */ +static AO2_GLOBAL_OBJ_STATIC(confs); + +static struct aco_type global_option = { + .type = ACO_GLOBAL, + .name = "global", + .item_offset = offsetof(struct cel_amqp_conf, global), + .category = "^global$", + .category_match = ACO_WHITELIST, +}; + +static struct aco_type *global_options[] = ACO_TYPES(&global_option); + +static void conf_global_dtor(void *obj) +{ + struct cel_amqp_global_conf *global = obj; + ao2_cleanup(global->amqp); + ast_string_field_free_memory(global); +} + +static struct cel_amqp_global_conf *conf_global_create(void) +{ + RAII_VAR(struct cel_amqp_global_conf *, global, NULL, ao2_cleanup); + + global = ao2_alloc(sizeof(*global), conf_global_dtor); + if (!global) { + return NULL; + } + + if (ast_string_field_init(global, 64) != 0) { + return NULL; + } + + aco_set_defaults(&global_option, "global", global); + + return ao2_bump(global); +} + +/*! \brief The conf file that's processed for the module. */ +static struct aco_file conf_file = { + /*! The config file name. */ + .filename = CONF_FILENAME, + /*! The mapping object types to be processed. */ + .types = ACO_TYPES(&global_option), +}; + +static void conf_dtor(void *obj) +{ + struct cel_amqp_conf *conf = obj; + + ao2_cleanup(conf->global); +} + +static void *conf_alloc(void) +{ + RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup); + + conf = ao2_alloc_options(sizeof(*conf), conf_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!conf) { + return NULL; + } + + conf->global = conf_global_create(); + if (!conf->global) { + return NULL; + } + + return ao2_bump(conf); +} + +CONFIG_INFO_STANDARD(cfg_info, confs, conf_alloc, + .files = ACO_FILES(&conf_file)); + +/*! + * \brief CEL handler for AMQP. + * + * \param event CEL event. + */ +static void amqp_cel_log(struct ast_event *event) +{ + RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup); + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref); + RAII_VAR(char *, str, NULL, ast_json_free); + const char *name; + int res; + struct ast_cel_event_record record = { + .version = AST_CEL_EVENT_RECORD_VERSION, + }; + amqp_basic_properties_t props = { + ._flags = AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_CONTENT_TYPE_FLAG, + .delivery_mode = 2, /* persistent delivery mode */ + .content_type = amqp_cstring_bytes("application/json") + }; + + conf = ao2_global_obj_ref(confs); + if (!conf || !conf->global) { + ast_log(LOG_ERROR, "Error obtaining config from cel_amqp.conf\n"); + return; + } + + if (!conf->global->amqp) { + ast_log(LOG_ERROR, "No AMQP connection; discarding CEL\n"); + return; + } + + /* Extract the data from the CEL */ + if (ast_cel_fill_record(event, &record) != 0) { + return; + } + + /* Handle user define events */ + name = record.event_name; + if (record.event_type == AST_CEL_USER_DEFINED) { + name = record.user_defined_name; + } + + /* Handle the optional extra field, although re-parsing JSON + * makes me sad :-( */ + if (strlen(record.extra) == 0) { + extra = ast_json_null(); + } else { + extra = ast_json_load_string(record.extra, NULL); + if (!extra) { + ast_log(LOG_ERROR, "Error parsing extra field\n"); + extra = ast_json_string_create(record.extra); + } + } + + json = ast_json_pack("{" + /* event_name, account_code */ + "s: s, s: s," + /* num, name, ani, rdnis, dnid */ + "s: { s: s, s: s, s: s, s: s, s: s }," + /* extension, context, channel, application */ + "s: s, s: s, s: s, s: s, " + /* app_data, event_time, amaflags, unique_id */ + "s: s, s: o, s: s, s: s, " + /* linked_id, uesr_field, peer, peer_account */ + "s: s, s: s, s: s, s: s, " + /* extra */ + "s: o" + "}", + "event_name", name, + "account_code", record.account_code, + + "caller_id", + "num", record.caller_id_num, + "name", record.caller_id_name, + "ani", record.caller_id_ani, + "rdnis", record.caller_id_rdnis, + "dnid", record.caller_id_dnid, + + "extension", record.extension, + "context", record.context, + "channel", record.channel_name, + "application", record.application_name, + + "app_data", record.application_data, + "event_time", ast_json_timeval(record.event_time, NULL), + "amaflags", ast_channel_amaflags2string(record.amaflag), + "unique_id", record.unique_id, + + "linked_id", record.linked_id, + "user_field", record.user_field, + "peer", record.peer, + "peer_acount", record.peer_account, + "extra", extra); + if (!json) { + return; + } + + /* Dump the JSON to a string for publication */ + str = ast_json_dump_string(json); + if (!str) { + return; + } + + res = ast_amqp_basic_publish(conf->global->amqp, + amqp_cstring_bytes(conf->global->exchange), + amqp_cstring_bytes(conf->global->queue), + 0, /* mandatory; don't return unsendable messages */ + 0, /* immediate; allow messages to be queued */ + &props, + amqp_cstring_bytes(str)); + + if (res != 0) { + ast_log(LOG_ERROR, "Error publishing CEL to AMQP\n"); + } +} + +static int load_config(int reload) +{ + RAII_VAR(struct cel_amqp_conf *, conf, NULL, ao2_cleanup); + RAII_VAR(struct ast_amqp_connection *, amqp, NULL, ao2_cleanup); + + switch (aco_process_config(&cfg_info, reload)) { + case ACO_PROCESS_ERROR: + return -1; + case ACO_PROCESS_OK: + case ACO_PROCESS_UNCHANGED: + break; + } + + conf = ao2_global_obj_ref(confs); + if (!conf || !conf->global) { + ast_log(LOG_ERROR, "Error obtaining config from cel_amqp.conf\n"); + return -1; + } + + /* Refresh the AMQP connection */ + ao2_cleanup(conf->global->amqp); + conf->global->amqp = ast_amqp_get_connection(conf->global->connection); + + if (!conf->global->amqp) { + ast_log(LOG_ERROR, "Could not get AMQP connection %s\n", + conf->global->connection); + return -1; + } + + return 0; +} + +static int load_module(void) +{ + if (aco_info_init(&cfg_info) != 0) { + ast_log(LOG_ERROR, "Failed to initialize config"); + aco_info_destroy(&cfg_info); + return -1; + } + + aco_option_register(&cfg_info, "connection", ACO_EXACT, + global_options, "", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cel_amqp_global_conf, connection)); + aco_option_register(&cfg_info, "queue", ACO_EXACT, + global_options, "asterisk_cel", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cel_amqp_global_conf, queue)); + aco_option_register(&cfg_info, "exchange", ACO_EXACT, + global_options, "", OPT_STRINGFIELD_T, 0, + STRFLDSET(struct cel_amqp_global_conf, exchange)); + + if (load_config(0) != 0) { + ast_log(LOG_WARNING, "Configuration failed to load\n"); + return AST_MODULE_LOAD_DECLINE; + } + + if (ast_cel_backend_register(CEL_NAME, amqp_cel_log) != 0) { + ast_log(LOG_ERROR, "Could not register CEL backend\n"); + return AST_MODULE_LOAD_FAILURE; + } + + ast_log(LOG_NOTICE, "CEL AMQP logging enabled\n"); + return AST_MODULE_LOAD_SUCCESS; +} + +static int unload_module(void) +{ + aco_info_destroy(&cfg_info); + ao2_global_obj_release(confs); + if (ast_cel_backend_unregister(CEL_NAME) != 0) { + return -1; + } + + return 0; +} + +static int reload_module(void) +{ + return load_config(1); +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "AMQP CEL Backend", + .support_level = AST_MODULE_SUPPORT_CORE, + .load = load_module, + .unload = unload_module, + .reload = reload_module, + .load_pri = AST_MODPRI_CDR_DRIVER, + ); Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/cel/cel_amqp.c ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample (added) +++ team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample Wed Jan 21 13:59:24 2015 @@ -1,0 +1,16 @@ +[general] +;enabled = yes ; Set to no to disable + +;[bunny] +;type = connection +;url = amqp://localhost ; amqp://[$USERNAME@]$HOST[:$PORT][/$VHOST] + ; username defaults to guest + ; port defaults to 5672 + ; vhost defaults to / +;password = ; Password to use for login + ; defaults to guest +;max_frame_bytes = ; Maximum frame size, in bytes; defaults to + ; AMQP_DEFAULT_FRAME_SIZE (131072, or 128KB) +;heartbeat_seconds = ; number of seconds between heartbeat frames + ; 0 disables hearbeats; defaults to + ; AMQP_DEFAULT_HEARTBEAT (0) Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/configs/samples/amqp.conf.sample ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample (added) +++ team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample Wed Jan 21 13:59:24 2015 @@ -1,0 +1,10 @@ +; +; cdr_amqp.conf +; + +[global] +;loguniqueid = no ; log uniqueid. Default is "no" +;loguserfield = no ; log user field. Default is "no" +;connection = bunny ; Connection name in amqp.conf +;queue = asterisk_cdr ; Queue name to publish to; defaults to asterisk_cdr +;exchange = ; Exchange to publish to; defaults to empty string Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/configs/samples/cdr_amqp.conf.sample ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample (added) +++ team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample Wed Jan 21 13:59:24 2015 @@ -1,0 +1,8 @@ +; +; cel_amqp.conf +; + +[global] +;connection = bunny ; Connection name in amqp.conf +;queue = asterisk_cel ; Queue name to publish to; defaults to asterisk_cel +;exchange = ; Exchange to publish to; defaults to empty string Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/configs/samples/cel_amqp.conf.sample ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: team/dlee/amqp-cdr-cel/configure.ac URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/configure.ac?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/configure.ac (original) +++ team/dlee/amqp-cdr-cel/configure.ac Wed Jan 21 13:59:24 2015 @@ -485,6 +485,7 @@ AST_EXT_LIB_SETUP_OPTIONAL([PRI_REVERSE_CHARGE], [ISDN reverse charge], [PRI], [pri]) # ------------------------------------^ AST_EXT_LIB_SETUP([PWLIB], [PWlib], [pwlib]) +AST_EXT_LIB_SETUP([RABBITMQ], [RabbitMQ client], [rabbitmq]) AST_EXT_LIB_SETUP([RADIUS], [Radius Client], [radius]) AST_EXT_LIB_SETUP([RESAMPLE], [LIBRESAMPLE], [resample]) AST_EXT_LIB_SETUP([SDL], [Sdl], [sdl]) @@ -2195,6 +2196,8 @@ # Some distributions (like SuSE) remove the 5.1 suffix. AST_EXT_LIB_CHECK([LUA], [lua], [luaL_openlib], [lua.h], [-lm]) +AST_EXT_LIB_CHECK([RABBITMQ], [rabbitmq], [amqp_socket_open], [amqp.h]) + # Accept either RADIUS client library, their APIs are fully compatible, # just different header filenames and different SONAMEs AST_EXT_LIB_CHECK([RADIUS], [freeradius-client], [rc_read_config], [freeradius-client.h]) Modified: team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq (original) +++ team/dlee/amqp-cdr-cel/contrib/scripts/install_prereq Wed Jan 21 13:59:24 2015 @@ -28,12 +28,12 @@ PACKAGES_DEBIAN="$PACKAGES_DEBIAN libopenh323-dev libvpb-dev libgtk2.0-dev libmysqlclient-dev libbluetooth-dev libradiusclient-ng-dev freetds-dev" PACKAGES_DEBIAN="$PACKAGES_DEBIAN libsnmp-dev libiksemel-dev libcorosync-dev libnewt-dev libpopt-dev libical-dev libspandsp-dev libjack-dev" PACKAGES_DEBIAN="$PACKAGES_DEBIAN libresample-dev libc-client-dev binutils-dev libsrtp-dev libgsm1-dev libedit-dev doxygen libjansson-dev libldap-dev" -PACKAGES_DEBIAN="$PACKAGES_DEBIAN subversion git libxslt1-dev" +PACKAGES_DEBIAN="$PACKAGES_DEBIAN subversion git libxslt1-dev librabbitmq-dev" PACKAGES_RH="automake gcc gcc-c++ ncurses-devel openssl-devel libxml2-devel unixODBC-devel libcurl-devel libogg-devel libvorbis-devel speex-devel" PACKAGES_RH="$PACKAGES_RH spandsp-devel freetds-devel net-snmp-devel iksemel-devel corosynclib-devel newt-devel popt-devel libtool-ltdl-devel lua-devel" PACKAGES_RH="$PACKAGES_RH libsqlite3x-devel radiusclient-ng-devel portaudio-devel postgresql-devel libresample-devel neon-devel libical-devel" PACKAGES_RH="$PACKAGES_RH openldap-devel gmime22-devel sqlite2-devel mysql-devel bluez-libs-devel jack-audio-connection-kit-devel gsm-devel libedit-devel libuuid-devel" -PACKAGES_RH="$PACKAGES_RH jansson-devel libsrtp-devel pjproject-devel subversion git libxslt-devel" +PACKAGES_RH="$PACKAGES_RH jansson-devel libsrtp-devel pjproject-devel subversion git libxslt-devel librabbitmq-devel" PACKAGES_OBSD="popt gmake wget libxml libogg libvorbis curl iksemel spandsp speex iodbc freetds-0.63p1-msdblib mysql-client gmime sqlite sqlite3 jack libxslt" Added: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/amqp.h?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/include/asterisk/amqp.h (added) +++ team/dlee/amqp-cdr-cel/include/asterisk/amqp.h Wed Jan 21 13:59:24 2015 @@ -1,0 +1,85 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2015, Digium, Inc. + * + * David M. Lee, II <d...@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _ASTERISK_AMQP_H +#define _ASTERISK_AMQP_H + +/*! \file + * \brief AMQP client + * + * \author David M. Lee, II <d...@digium.com> + * \since 13.x + * + * This file contains the Asterisk API for AMQP. Connections are configured + * in \c amqp.conf. You can get a connection by name, using \ref + * ast_amqp_get_connection(). + * + * Only publish support is implemented, using \ref ast_amqp_basic_publish(). + * + * Note that the AMQP protocol has a "channel" feature, which allows + * multiplexing multiple requests on a single TCP socket. Unfortunately, the + * underlying \c librabbitmq library is not thread safe, so we couldn't take + * advantage of this feature. Because of that, and the complications it adds + * to using the API, we've omitted that feature. + */ + +#include <amqp.h> + +/*! + * Opaque handle for the AMQP connection. + */ +struct ast_amqp_connection; + +/*! + * \brief Gets the given AMQP connection. + * + * The returned connection is an AO2 managed object, which must be freed with + * \ref ao2_cleanup(). + * + * \param name The name of the connection. + * \return The connection object. + * \return \c NULL if connection not found, or some other error. + */ +struct ast_amqp_connection *ast_amqp_get_connection(const char *name); + +/*! + * \brief Publishes a message to a AMQP connection. + * + * \param cxn The connection to publish to. + * \param exchange the exchange on the broker to publish to + * \param routing_key the routing key (queue) to use when publishing the message + * \param mandatory indicate to the broker that the message MUST be routed to a + * queue. If the broker cannot do this it should respond with + * a basic.reject method + * \param immediate indicate to the broker that the message MUST be delivered + * to a consumer immediately. If the broker cannot do this it + * should response with a basic.reject method + * \param properties Properties of the message (content-type, delivery mode, etc.) + * \param body The text of the message to send. + * \return 0 on success. + * \return -1 on failure. + */ +int ast_amqp_basic_publish(struct ast_amqp_connection *cxn, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + const amqp_basic_properties_t *properties, + amqp_bytes_t body); + +#endif /* _ASTERISK_AMQP_H */ Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Propchange: team/dlee/amqp-cdr-cel/include/asterisk/amqp.h ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in (original) +++ team/dlee/amqp-cdr-cel/include/asterisk/autoconfig.h.in Wed Jan 21 13:59:24 2015 @@ -717,6 +717,9 @@ /* Define if your system has the PWLib libraries. */ #undef HAVE_PWLIB + +/* Define to 1 if you have the RabbitMQ client library. */ +#undef HAVE_RABBITMQ /* Define to 1 if you have the Radius Client library. */ #undef HAVE_RADIUS @@ -1299,6 +1302,11 @@ /* Define to 1 if running on Darwin. */ #undef _DARWIN_UNLIMITED_SELECT +/* Enable large inode numbers on Mac OS X 10.5. */ +#ifndef _DARWIN_USE_64_BIT_INODE +# define _DARWIN_USE_64_BIT_INODE 1 +#endif + /* Number of bits in a file offset, on hosts where this is settable. */ #undef _FILE_OFFSET_BITS Modified: team/dlee/amqp-cdr-cel/include/asterisk/cel.h URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/include/asterisk/cel.h?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/include/asterisk/cel.h (original) +++ team/dlee/amqp-cdr-cel/include/asterisk/cel.h Wed Jan 21 13:59:24 2015 @@ -34,6 +34,8 @@ #endif #include "asterisk/event.h" +#include "asterisk/json.h" +#include "asterisk/stringfields.h" /*! * \brief CEL event types Modified: team/dlee/amqp-cdr-cel/makeopts.in URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/makeopts.in?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/makeopts.in (original) +++ team/dlee/amqp-cdr-cel/makeopts.in Wed Jan 21 13:59:24 2015 @@ -237,6 +237,9 @@ PRI_INCLUDE=@PRI_INCLUDE@ PRI_LIB=@PRI_LIB@ +RABBITMQ_INCLUDE=@RABBITMQ_INCLUDE@ +RABBITMQ_LIB=@RABBITMQ_LIB@ + RESAMPLE_INCLUDE=@RESAMPLE_INCLUDE@ RESAMPLE_LIB=@RESAMPLE_LIB@ Modified: team/dlee/amqp-cdr-cel/res/Makefile URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/res/Makefile?view=diff&rev=430900&r1=430899&r2=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/res/Makefile (original) +++ team/dlee/amqp-cdr-cel/res/Makefile Wed Jan 21 13:59:24 2015 @@ -76,6 +76,7 @@ rm -f snmp/*.[oi] ael/*.[oi] ais/*.[oi] ari/*.[oi] rm -f res_pjsip/*.[oi] stasis/*.[oi] rm -f parking/*.o parking/*.i stasis_recording/*.[oi] + rm -f amqp/*.[oi] $(if $(filter res_parking,$(EMBEDDED_MODS)),modules.link,res_parking.so): $(subst .c,.o,$(wildcard parking/*.c)) $(subst .c,.o,$(wildcard parking/*.c)): _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_parking) @@ -89,5 +90,8 @@ res_stasis_recording.so: stasis_recording/stored.o stasis_recording/stored.o: _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_stasis_recording) +res_amqp.so: amqp/cli.o amqp/config.o +amqp/cli.o amqp/config.o: _ASTCFLAGS+=$(call MOD_ASTCFLAGS,res_amqp) + # Dependencies for res_ari_*.so are generated, so they're in this file include ari.make Added: team/dlee/amqp-cdr-cel/res/amqp/cli.c URL: http://svnview.digium.com/svn/asterisk/team/dlee/amqp-cdr-cel/res/amqp/cli.c?view=auto&rev=430900 ============================================================================== --- team/dlee/amqp-cdr-cel/res/amqp/cli.c (added) +++ team/dlee/amqp-cdr-cel/res/amqp/cli.c Wed Jan 21 13:59:24 2015 @@ -1,0 +1,243 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2015, Digium, Inc. + * + * David M. Lee, II <d...@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! \file + * + * \brief Command line for AMQP. + * \author David M. Lee, II <d...@digium.com> + */ + +#include "asterisk.h" + +ASTERISK_FILE_VERSION(__FILE__, "$Revision$") + +#include "asterisk/cli.h" +#include "asterisk/amqp.h" +#include "internal.h" + +#define CLI_NAME_WIDTH 15 +#define CLI_URL_WIDTH 25 + [... 1149 lines stripped ...] -- _____________________________________________________________________ -- Bandwidth and Colocation Provided by http://www.api-digital.com -- svn-commits mailing list To UNSUBSCRIBE or update options visit: http://lists.digium.com/mailman/listinfo/svn-commits