Modified: webservices/savan/trunk/c/src/util/savan_util.c URL: http://svn.apache.org/viewvc/webservices/savan/trunk/c/src/util/savan_util.c?rev=749604&r1=749603&r2=749604&view=diff ============================================================================== --- webservices/savan/trunk/c/src/util/savan_util.c (original) +++ webservices/savan/trunk/c/src/util/savan_util.c Tue Mar 3 12:44:05 2009 @@ -16,10 +16,10 @@ #include <axis2_msg_info_headers.h> #include <axis2_options.h> -#include <axis2_svc_client.h> #include <axis2_engine.h> #include <axis2_core_utils.h> #include <axis2_endpoint_ref.h> +#include <axis2_svc_client.h> #include <platforms/axutil_platform_auto_sense.h> #include <axiom_soap.h> #include <axiom_soap_const.h> @@ -30,7 +30,7 @@ #include <savan_util.h> #include <savan_msg_recv.h> #include <savan_error.h> -#include <savan_db_mgr.h> +#include <savan_storage_mgr.h> #ifdef SAVAN_FILTERING #include <libxslt/xsltutils.h> #endif @@ -42,42 +42,6 @@ const xmlChar* value); #endif -static axis2_status_t -add_subscriber_to_remote_subs_mgr( - const axutil_env_t *env, - savan_subscriber_t *subscriber, - axis2_char_t *subs_mgr_url); - -static axiom_node_t * -build_add_subscriber_om_payload( - const axutil_env_t *env, - savan_subscriber_t *subscriber); - -static axiom_node_t * -build_subscriber_request_om_payload( - const axutil_env_t *env, - axis2_char_t *subs_id); - -static axiom_node_t * -build_subscribers_request_om_payload( - const axutil_env_t *env, - axis2_char_t *topic); - -static axiom_node_t * -build_topics_request_om_payload( - const axutil_env_t *env); - -static axutil_array_list_t * -process_subscriber_list_node( - const axutil_env_t *env, - axiom_node_t *subs_list_node, - axis2_conf_t *conf); - -static axutil_array_list_t * -process_topic_list_node( - const axutil_env_t *env, - axiom_node_t *subs_list_node); - axis2_status_t AXIS2_CALL savan_util_create_fault_envelope( axis2_msg_ctx_t *msg_ctx, @@ -411,7 +375,8 @@ savan_util_get_subscriber_from_msg( const axutil_env_t *env, axis2_msg_ctx_t *msg_ctx, - axis2_char_t *sub_id) + savan_storage_mgr_t *storage_mgr, + const axis2_char_t *sub_id) { savan_subscriber_t *subscriber = NULL; @@ -422,69 +387,131 @@ { sub_id = savan_util_get_subscription_id_from_msg(env, msg_ctx); } - { - axis2_char_t sql_retrieve[256]; - axis2_conf_ctx_t *conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - axis2_conf_t *conf = axis2_conf_ctx_get_conf(conf_ctx, env); - - sprintf(sql_retrieve, "select id, end_to, notify_to, delivery_mode, "\ - "expires, filter, renewed, topic_url from subscriber, topic"\ - " where id='%s' and topic.topic_name=subscriber.topic_name;", sub_id); - subscriber = savan_db_mgr_retrieve(env, savan_util_get_dbname(env, conf), - savan_db_mgr_subs_retrieve_callback, sql_retrieve); - } + subscriber = savan_storage_mgr_retrieve_subscriber(storage_mgr, env, sub_id); AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_get_subscriber_from_msg"); return subscriber; } -axis2_status_t AXIS2_CALL -savan_util_add_subscriber( - const axutil_env_t *env, - axis2_msg_ctx_t *msg_ctx, - savan_subscriber_t *subscriber) +savan_subscriber_t * AXIS2_CALL +savan_util_get_subscriber_from_renew_msg( + const axutil_env_t *env, + axis2_msg_ctx_t *msg_ctx, + savan_storage_mgr_t *storage_mgr, + const axis2_char_t *sub_id) { - axis2_conf_ctx_t *conf_ctx = NULL; - axis2_conf_t *conf = NULL; - axis2_module_desc_t *module_desc = NULL; - axis2_svc_t *pubs_svc = NULL; - axutil_param_t *param = NULL; - axis2_endpoint_ref_t *topic_epr = NULL; - axis2_char_t *topic_url = NULL; - axis2_status_t status = AXIS2_FAILURE; + savan_subscriber_t *subscriber = NULL; + axiom_soap_envelope_t *envelope = NULL; + axiom_soap_header_t *header = NULL; axutil_qname_t *qname = NULL; + axiom_node_t *header_node = NULL; + axiom_node_t *id_node = NULL; + axiom_element_t *id_elem = NULL; + axiom_node_t *expires_node = NULL; + axiom_element_t *expires_elem = NULL; + axiom_node_t *renew_node = NULL; + axiom_element_t *renew_elem = NULL; + axiom_element_t *header_elem = NULL; + axis2_char_t *expires = NULL; + axis2_char_t *renewed_expires = NULL; + axiom_soap_body_t *body = NULL; + axiom_node_t *body_node = NULL; + axiom_element_t *body_elem = NULL; - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_add_subscriber"); - conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - topic_epr = axis2_msg_ctx_get_to(msg_ctx, env); - topic_url = (axis2_char_t *) axis2_endpoint_ref_get_address(topic_epr, env); - pubs_svc = axis2_msg_ctx_get_svc(msg_ctx, env); - if (!pubs_svc) + AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_get_subscriber_from_renew_msg"); + + /* Extract subscription id from msg if not already given */ + if (!sub_id) { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, - "[savan] Failed to extract the %s publisher service", topic_url); - return AXIS2_FAILURE; + sub_id = savan_util_get_subscription_id_from_msg(env, msg_ctx); } - conf = axis2_conf_ctx_get_conf(conf_ctx, env); - qname = axutil_qname_create(env, SAVAN_MODULE, NULL, NULL); - module_desc = axis2_conf_get_module(conf, env, qname); - param = axis2_module_desc_get_param(module_desc, env, SAVAN_SUBSCRIPTION_MGR_URL); + + subscriber = savan_storage_mgr_retrieve_subscriber(storage_mgr, env, sub_id); + + /* Get soap envelop and extract the subscription id */ + + envelope = axis2_msg_ctx_get_soap_envelope(msg_ctx, env); + if (!envelope) + { + AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the soap envelop"); + return NULL; + } + + header = axiom_soap_envelope_get_header(envelope, env); + if (!header) + { + AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the soap header"); + return NULL; + } + + /* Get header element from header node */ + header_node = axiom_soap_header_get_base_node(header, env); + header_elem = (axiom_element_t*)axiom_node_get_data_element(header_node, env); + + /* Get Identifier element from header */ + qname = axutil_qname_create(env, ELEM_NAME_ID, EVENTING_NAMESPACE, NULL); + id_elem = axiom_element_get_first_child_with_qname(header_elem, env, qname, + header_node, &id_node); axutil_qname_free(qname, env); - if(param) + + /* Now read the id */ + sub_id = axiom_element_get_text(id_elem, env, id_node); + + /* Get Expires element from body */ + body = axiom_soap_envelope_get_body(envelope, env); + if (!body) { - axis2_char_t *subs_mgr_url = NULL; - subs_mgr_url = axutil_param_get_value(param, env); - status = add_subscriber_to_remote_subs_mgr(env, subscriber, subs_mgr_url); + AXIS2_HANDLE_ERROR(env, AXIS2_ERROR_SOAP_ENVELOPE_OR_SOAP_BODY_NULL, AXIS2_FAILURE); + AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to extract the soap body"); + return NULL; } - else + + body_node = axiom_soap_body_get_base_node(body, env); + body_elem = (axiom_element_t*)axiom_node_get_data_element(body_node, env); + + /* Get Subscribe element from Body */ + qname = axutil_qname_create(env, ELEM_NAME_RENEW, EVENTING_NAMESPACE, NULL); + renew_elem = axiom_element_get_first_child_with_qname(body_elem, env, qname, body_node, + &renew_node); + axutil_qname_free(qname, env); + + qname = axutil_qname_create(env, ELEM_NAME_EXPIRES, EVENTING_NAMESPACE, NULL); + expires_elem = axiom_element_get_first_child_with_qname(renew_elem, env, qname, renew_node, + &expires_node); + axutil_qname_free(qname, env); + if(expires_elem) { - axis2_conf_ctx_t *conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - axis2_conf_t *conf = axis2_conf_ctx_get_conf(conf_ctx, env); - - status = savan_db_mgr_insert_subscriber(env, savan_util_get_dbname(env, conf), subscriber); + expires = axiom_element_get_text(expires_elem, env, expires_node); + if(expires) + { + /* Check whether the subscription can be renewed. If renewable, set the new + * expiry time in the subscriber */ + savan_subscriber_set_expires(subscriber, env, expires); + renewed_expires = savan_util_get_renewed_expiry_time(env, expires); + savan_subscriber_set_expires(subscriber, env, renewed_expires); + } } + + savan_subscriber_set_renew_status(subscriber, env, AXIS2_TRUE); + AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_get_subscriber_from_renew_msg"); + + return subscriber; +} + +axis2_status_t AXIS2_CALL +savan_util_add_subscriber( + const axutil_env_t *env, + axis2_msg_ctx_t *msg_ctx, + savan_storage_mgr_t *storage_mgr, + savan_subscriber_t *subscriber) +{ + axis2_status_t status = AXIS2_FAILURE; + + AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_add_subscriber"); + + status = savan_storage_mgr_insert_subscriber(storage_mgr, env, subscriber); if(status) { axutil_property_t *subs_prop = NULL; @@ -501,19 +528,12 @@ savan_util_update_subscriber( const axutil_env_t *env, axis2_msg_ctx_t *msg_ctx, + savan_storage_mgr_t *storage_mgr, savan_subscriber_t *subscriber) { - axis2_conf_ctx_t *conf_ctx = NULL; - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_update_subscriber"); - conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - { - axis2_conf_ctx_t *conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - axis2_conf_t *conf = axis2_conf_ctx_get_conf(conf_ctx, env); - - savan_db_mgr_update_subscriber(env, savan_util_get_dbname(env, conf), subscriber); - } + savan_storage_mgr_update_subscriber(storage_mgr, env, subscriber); AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_util_update_subscriber"); return AXIS2_SUCCESS; @@ -523,23 +543,19 @@ savan_util_remove_subscriber( const axutil_env_t *env, axis2_msg_ctx_t *msg_ctx, + savan_storage_mgr_t *storage_mgr, savan_subscriber_t *subscriber) { AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_remove_subscriber"); { - axis2_char_t *subs_id = NULL; - axis2_char_t sql_remove[256]; - axis2_conf_ctx_t *conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env); - axis2_conf_t *conf = axis2_conf_ctx_get_conf(conf_ctx, env); + const axis2_char_t *subs_id = NULL; subs_id = savan_subscriber_get_id(subscriber, env); /* Extract the store from the svc and remove the given subscriber */ - sprintf(sql_remove, "delete from subscriber where id='%s'", subs_id); - - savan_db_mgr_remove(env, savan_util_get_dbname(env, conf), sql_remove); + savan_storage_mgr_remove_subscriber(storage_mgr, env, subs_id); } AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_util_remove_subscriber"); @@ -581,474 +597,6 @@ return AXIS2_SUCCESS; } -static axis2_status_t -add_subscriber_to_remote_subs_mgr( - const axutil_env_t *env, - savan_subscriber_t *subscriber, - axis2_char_t *subs_mgr_url) -{ - const axis2_char_t *address = NULL; - axis2_endpoint_ref_t* endpoint_ref = NULL; - axis2_options_t *options = NULL; - axis2_svc_client_t* svc_client = NULL; - axiom_node_t *payload = NULL; - - svc_client = (axis2_svc_client_t *) savan_util_get_svc_client(env); - options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env); - address = subs_mgr_url; - endpoint_ref = axis2_endpoint_ref_create(env, address); - axis2_options_set_to(options, env, endpoint_ref); - axis2_options_set_action(options, env, - "http://ws.apache.org/axis2/c/subscription/add_subscriber"); - - payload = build_add_subscriber_om_payload(env, subscriber); - /* Send request */ - axis2_svc_client_send_robust(svc_client, env, payload); - if(svc_client) - axis2_svc_client_free(svc_client, env); - - return AXIS2_SUCCESS; -} - -/*static axis2_status_t -remove_subscriber_from_remote_subs_mgr( - const axutil_env_t *env, - savan_subscriber_t *subscriber, - axis2_char_t *subs_mgr_url) -{ - const axis2_char_t *address = NULL; - axis2_endpoint_ref_t* endpoint_ref = NULL; - axis2_options_t *options = NULL; - axis2_svc_client_t* svc_client = NULL; - axiom_node_t *payload = NULL; - - svc_client = (axis2_svc_client_t *) savan_util_get_svc_client(env); - options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env); - address = subs_mgr_url; - endpoint_ref = axis2_endpoint_ref_create(env, address); - axis2_options_set_to(options, env, endpoint_ref); - axis2_options_set_action(options, env, - "http://ws.apache.org/axis2/c/subscription/remove_subscriber"); - - payload = build_remove_subscriber_om_payload(env, subscriber); - // Send request - axis2_svc_client_send_robust(svc_client, env, payload); - if(svc_client) - axis2_svc_client_free(svc_client, env); - - return AXIS2_SUCCESS; -}*/ - -savan_subscriber_t *AXIS2_CALL -savan_util_get_subscriber_from_remote_subs_mgr( - const axutil_env_t *env, - axis2_char_t *subs_id, - axis2_char_t *subs_mgr_url, - void *s_client, - axis2_conf_t *conf) -{ - axis2_endpoint_ref_t* endpoint_ref = NULL; - axis2_options_t *options = NULL; - axis2_svc_client_t* svc_client = NULL; - axiom_node_t *payload = NULL; - axiom_node_t *ret_node = NULL; - savan_subscriber_t *subscriber = NULL; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Entry:savan_util_get_subscriber_from_remote_subs_mgr"); - - svc_client = (axis2_svc_client_t *) s_client; - options = (axis2_options_t *)axis2_svc_client_get_options(svc_client, env); - endpoint_ref = axis2_endpoint_ref_create(env, subs_mgr_url); - axis2_options_set_to(options, env, endpoint_ref); - - payload = build_subscriber_request_om_payload(env, subs_id); - ret_node = axis2_svc_client_send_receive(svc_client, env, payload); - if (ret_node) - { - subscriber = savan_util_process_savan_specific_subscriber_node(env, ret_node, conf); - } - else - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Stub invoke FAILED: Error code:" - " %d :: %s", env->error->error_number, AXIS2_ERROR_GET_MESSAGE(env->error)); - } - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Exit:savan_util_get_subscriber_from_remote_subs_mgr"); - return subscriber; -} - -static axiom_node_t * -build_subscriber_request_om_payload( - const axutil_env_t *env, - axis2_char_t *subs_id) -{ - axiom_node_t *om_node = NULL; - axiom_element_t* om_ele = NULL; - axiom_node_t* subs_id_om_node = NULL; - axiom_element_t * subs_id_om_ele = NULL; - axiom_namespace_t *ns1 = NULL; - axis2_char_t *om_str = NULL; - - ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - om_ele = axiom_element_create(env, NULL, ELEM_NAME_GET_SUBSCRIBER, ns1, &om_node); - subs_id_om_ele = axiom_element_create(env, om_node, ELEM_NAME_SUBSCRIBER_ID, ns1, - &subs_id_om_node); - axiom_element_set_text(subs_id_om_ele, env, subs_id, subs_id_om_node); - - om_str = axiom_node_to_string(om_node, env); - if (om_str) - { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Sending OM : %s", om_str); - AXIS2_FREE(env->allocator, om_str); - om_str = NULL; - } - return om_node; -} - -savan_subscriber_t *AXIS2_CALL -savan_util_process_savan_specific_subscriber_node( - const axutil_env_t *env, - axiom_node_t *subs_node, - axis2_conf_t *conf) -{ - axiom_element_t *subs_elem = NULL; - axiom_node_t *sub_node = NULL; - axiom_element_t *sub_elem = NULL; - axutil_qname_t *qname = NULL; - axiom_node_t *id_node = NULL; - axiom_element_t *id_elem = NULL; - axis2_char_t *id = NULL; - axiom_node_t *topic_node = NULL; - axiom_element_t *topic_elem = NULL; - savan_subscriber_t *subscriber = NULL; - axis2_status_t status = AXIS2_SUCCESS; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Entry:savan_util_process_savan_specific_subscriber_node"); - - AXIS2_PARAM_CHECK(env->error, subs_node, AXIS2_FAILURE); - - subscriber = savan_subscriber_create(env); - if (!subscriber) - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Failed to create a subscriber instance"); - AXIS2_ERROR_SET(env->error, SAVAN_ERROR_FAILED_TO_CREATE_SUBSCRIBER, AXIS2_FAILURE); - return NULL; - } - - subs_elem = axiom_node_get_data_element(subs_node, env); - - /* Id */ - qname = axutil_qname_create(env, ELEM_NAME_ID, SAVAN_NAMESPACE, NULL); - id_elem = axiom_element_get_first_child_with_qname(subs_elem, env, qname, subs_node, &id_node); - axutil_qname_free(qname, env); - id = axiom_element_get_text(id_elem, env, id_node); - savan_subscriber_set_id(subscriber, env, id); - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Received subscriber id:%s", id); - - /* Topic */ - qname = axutil_qname_create(env, ELEM_NAME_TOPIC, SAVAN_NAMESPACE, NULL); - topic_elem = axiom_element_get_first_child_with_qname(subs_elem, env, qname, subs_node, &topic_node); - axutil_qname_free(qname, env); - if(topic_elem) - { - axis2_char_t *topic_url = NULL; - - topic_url = axiom_element_get_text(topic_elem, env, topic_node); - savan_subscriber_set_topic_url(subscriber, env, topic_url); - status = savan_util_populate_topic(env, topic_url, conf); - if(status != AXIS2_SUCCESS) - { - AXIS2_ERROR_SET(env->error, SAVAN_ERROR_COULD_NOT_POPULATE_TOPIC, AXIS2_FAILURE); - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Could not populate topic"); - return NULL; - } - - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Received subscriber topic:%s", topic_url); - } - - qname = axutil_qname_create(env, ELEM_NAME_SUBSCRIBE, EVENTING_NAMESPACE, NULL); - sub_elem = axiom_element_get_first_child_with_qname(subs_elem, env, qname, subs_node, &sub_node); - axutil_qname_free(qname, env); - - if(sub_node) - { - /* Now read each sub element of Subscribe element */ - status = savan_util_process_subscriber_node(env, sub_node, sub_elem, subscriber); - if(AXIS2_SUCCESS != status) - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[savan] Parsing subscriber node failed"); - AXIS2_ERROR_SET(env->error, SAVAN_ERROR_PARSING_SUBSCRIBER_NODE_FAILED, AXIS2_FAILURE); - return NULL; - } - } - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Exit:savan_util_process_savan_specific_subscriber_node"); - return subscriber; -} - -axutil_array_list_t *AXIS2_CALL -savan_util_get_subscriber_list_from_remote_subs_mgr( - const axutil_env_t *env, - axis2_char_t *topic, - axis2_char_t *subs_mgr_url, - void *s_client, - axis2_conf_t *conf) -{ - axis2_endpoint_ref_t* endpoint_ref = NULL; - axis2_options_t *options = NULL; - axis2_svc_client_t* svc_client = NULL; - axiom_node_t *payload = NULL; - axiom_node_t *ret_node = NULL; - axutil_array_list_t *subscriber_list = NULL; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Entry:savan_util_get_subscriber_list_from_remote_subs_mgr"); - - svc_client = (axis2_svc_client_t *) s_client; - options = (axis2_options_t *)axis2_svc_client_get_options(svc_client, - env); - endpoint_ref = axis2_endpoint_ref_create(env, subs_mgr_url); - axis2_options_set_to(options, env, endpoint_ref); - - payload = build_subscribers_request_om_payload(env, topic); - ret_node = axis2_svc_client_send_receive(svc_client, env, payload); - if (ret_node) - { - subscriber_list = process_subscriber_list_node(env, ret_node, conf); - } - else - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, - "[savan] Stub invoke FAILED: Error code:" - " %d :: %s", env->error->error_number, - AXIS2_ERROR_GET_MESSAGE(env->error)); - } - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Exit:savan_util_get_subscriber_list_from_remote_subs_mgr"); - return subscriber_list; -} - -static axiom_node_t * -build_subscribers_request_om_payload( - const axutil_env_t *env, - axis2_char_t *topic) -{ - axiom_node_t *om_node = NULL; - axiom_element_t* om_ele = NULL; - axiom_node_t* topic_om_node = NULL; - axiom_element_t * topic_om_ele = NULL; - axiom_namespace_t *ns1 = NULL; - axis2_char_t *om_str = NULL; - - ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - om_ele = axiom_element_create(env, NULL, ELEM_NAME_GET_SUBSCRIBER_LIST, ns1, &om_node); - topic_om_ele = axiom_element_create(env, om_node, ELEM_NAME_TOPIC, ns1, - &topic_om_node); - axiom_element_set_text(topic_om_ele, env, topic, topic_om_node); - - om_str = axiom_node_to_string(om_node, env); - if (om_str) - { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "Sending OM : %s", om_str); - AXIS2_FREE(env->allocator, om_str); - om_str = NULL; - } - return om_node; -} - -static axutil_array_list_t * -process_subscriber_list_node( - const axutil_env_t *env, - axiom_node_t *subs_list_node, - axis2_conf_t *conf) -{ - axiom_element_t *subs_list_element = NULL; - axiom_children_qname_iterator_t *subs_iter = NULL; - axutil_qname_t *qname = NULL; - axutil_array_list_t *subscriber_list = NULL; - axis2_status_t status = AXIS2_SUCCESS; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Entry:process_subscriber_list_node"); - subs_list_element = axiom_node_get_data_element(subs_list_node, env); - - /* Get Subscriber elements from subscriber list */ - qname = axutil_qname_create(env, ELEM_NAME_SUBSCRIBER, SAVAN_NAMESPACE, - NULL); - subs_iter = axiom_element_get_children_with_qname(subs_list_element, env, - qname, subs_list_node); - axutil_qname_free(qname, env); - if(!subs_iter) - { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Subscribers list is empty"); - return NULL; - } - - if(axiom_children_qname_iterator_has_next(subs_iter, env)) - { - subscriber_list = axutil_array_list_create(env, 0); - } - - while(axiom_children_qname_iterator_has_next(subs_iter, env)) - { - savan_subscriber_t *subscriber = NULL; - axiom_node_t *subs_node = NULL; - - subs_node = axiom_children_qname_iterator_next(subs_iter, env); - if(subs_node) /* Iterate Savan specific subscriber elements */ - { - /* Now read Savan specific Subscribe element */ - subscriber = savan_util_process_savan_specific_subscriber_node(env, subs_node, conf); - if(!subscriber) - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, - "[savan] Failed process Savan specific Subscriber element"); - status = axutil_error_get_status_code(env->error); - return NULL; - - } - - axutil_array_list_add(subscriber_list, env, subscriber); - } - } - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:process_subscriber_list_node"); - return subscriber_list; -} - -static axiom_node_t * -build_add_subscriber_om_payload( - const axutil_env_t *env, - savan_subscriber_t *subscriber) -{ - axiom_node_t *add_node = NULL; - axiom_element_t* add_ele = NULL; - axiom_namespace_t *ns = NULL; - axiom_namespace_t *ns1 = NULL; - axiom_node_t *sub_node = NULL; - axiom_node_t *id_node = NULL; - axiom_node_t *topic_node = NULL; - axiom_node_t *endto_node = NULL; - axiom_node_t *delivery_node = NULL; - axiom_node_t *notify_node = NULL; - axiom_node_t *filter_node = NULL; - axiom_node_t *expires_node = NULL; - axiom_element_t* sub_elem = NULL; - axiom_element_t* id_elem = NULL; - axiom_element_t* topic_elem = NULL; - axiom_element_t* endto_elem = NULL; - axiom_element_t* delivery_elem = NULL; - axiom_element_t* notify_elem = NULL; - axiom_element_t* filter_elem = NULL; - axiom_element_t* expires_elem = NULL; - const axis2_char_t *endto = NULL; - const axis2_char_t *notify = NULL; - axis2_char_t *filter = NULL; - const axis2_char_t *expires = NULL; - axis2_char_t *topic_url = NULL; - axis2_char_t *id = NULL; - axis2_endpoint_ref_t *notify_ref = NULL; - axis2_endpoint_ref_t *endto_ref = savan_subscriber_get_end_to(subscriber, env); - - if(endto_ref) - { - endto = axis2_endpoint_ref_get_address(endto_ref, env); - } - - notify_ref = savan_subscriber_get_notify_to(subscriber, env); - if(notify_ref) - { - notify = axis2_endpoint_ref_get_address(notify_ref, env); - } - - filter = savan_subscriber_get_filter(subscriber, env); - expires = savan_subscriber_get_expires(subscriber, env); - id = savan_subscriber_get_id(subscriber, env); - - ns = axiom_namespace_create (env, EVENTING_NAMESPACE, EVENTING_NS_PREFIX); - ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - add_ele = axiom_element_create(env, NULL, ELEM_NAME_ADD_SUBSCRIBER, ns1, &add_node); - - /* create the id element */ - if(id) - { - id_elem = axiom_element_create(env, add_node, ELEM_NAME_ID, ns1, &id_node); - axiom_element_set_text(id_elem, env, id, id_node); - } - /* create the topic element */ - topic_elem = axiom_element_create(env, add_node, ELEM_NAME_TOPIC, ns1, &topic_node); - topic_url = savan_subscriber_get_topic_url(subscriber, env); - if(topic_url) - { - axiom_element_set_text(topic_elem, env, topic_url, topic_node); - } - - /* create the subscriber element */ - sub_elem = axiom_element_create(env, add_node, ELEM_NAME_SUBSCRIBE, ns, &sub_node); - - /* EndTo element */ - endto_elem = axiom_element_create(env, sub_node, ELEM_NAME_ENDTO, ns, &endto_node); - axiom_element_set_text(endto_elem, env, endto, endto_node); - - /* Delivery element */ - delivery_elem = axiom_element_create(env, sub_node, ELEM_NAME_DELIVERY, ns, &delivery_node); - - notify_elem = axiom_element_create(env, delivery_node, ELEM_NAME_NOTIFYTO, ns, ¬ify_node); - axiom_element_set_text(notify_elem, env, notify, notify_node); - - /* Expires element */ - expires_elem = axiom_element_create(env, sub_node, ELEM_NAME_EXPIRES, ns, &expires_node); - axiom_element_set_text(expires_elem, env, expires, expires_node); - - /* Filter element */ - filter_elem = axiom_element_create(env, sub_node, ELEM_NAME_FILTER, ns, &endto_node); - axiom_element_set_text(filter_elem, env, filter, filter_node); - - return add_node; -} - -/*static axiom_node_t * -build_remove_subscriber_om_payload( - const axutil_env_t *env, - savan_subscriber_t *subscriber) -{ - axiom_node_t *remove_node = NULL; - axiom_element_t* remove_ele = NULL; - axiom_namespace_t *ns = NULL; - axiom_namespace_t *ns1 = NULL; - axiom_node_t *id_node = NULL; - axiom_node_t *topic_node = NULL; - axiom_element_t* id_elem = NULL; - axiom_element_t* topic_elem = NULL; - axis2_char_t *topic = NULL; - axis2_char_t *id = NULL; - - id = savan_subscriber_get_id(subscriber, env); - - ns = axiom_namespace_create (env, EVENTING_NAMESPACE, EVENTING_NS_PREFIX); - ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - remove_ele = axiom_element_create(env, NULL, ELEM_NAME_REMOVE_SUBSCRIBER, - ns1, &remove_node); - - // create the id element - if(id) - { - id_elem = axiom_element_create(env, remove_node, ELEM_NAME_ID, ns1, &id_node); - axiom_element_set_text(id_elem, env, id, id_node); - } - // create the topic element - topic_elem = axiom_element_create(env, remove_node, ELEM_NAME_TOPIC, ns1, &topic_node); - topic = savan_subscriber_get_topic(subscriber, env); - if(topic) - axiom_element_set_text(topic_elem, env, topic, topic_node); - - return remove_node; -}*/ - axis2_char_t * AXIS2_CALL savan_util_get_expiry_time( const axutil_env_t *env) @@ -1063,9 +611,9 @@ const axutil_env_t *env, axis2_char_t *expiry) { - /* TODO: decide how to renew expiry time */ - - return NULL; + /* TODO: Decide how to renew expiry time, may be using policy. Currently honor the requested. */ + + return expiry; } axis2_char_t *AXIS2_CALL @@ -1073,10 +621,13 @@ const axutil_env_t *env, axis2_char_t *topic_url) { + AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "came"); axis2_char_t *topic = NULL; axis2_char_t *temp = NULL; temp = axutil_rindex(topic_url, AXIS2_PATH_SEP_CHAR) + 1; + AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "topic:%s", temp); + AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "topic2:%s", topic_url); if(temp) { if(axutil_strchr(temp, '"')) @@ -1091,129 +642,6 @@ return topic; } -axutil_array_list_t *AXIS2_CALL -savan_util_get_topic_list_from_remote_subs_mgr( - const axutil_env_t *env, - axis2_char_t *subs_mgr_url, - void *s_client) -{ - axis2_endpoint_ref_t* endpoint_ref = NULL; - axis2_options_t *options = NULL; - axis2_svc_client_t* svc_client = NULL; - axiom_node_t *payload = NULL; - axiom_node_t *ret_node = NULL; - axutil_array_list_t *topic_list = NULL; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Entry:savan_util_get_topic_list_from_remote_subs_mgr"); - - if(!s_client) - { - svc_client = (axis2_svc_client_t *) savan_util_get_svc_client(env); - } - else - { - svc_client = (axis2_svc_client_t *) s_client; - } - options = (axis2_options_t *) axis2_svc_client_get_options(svc_client, env); - endpoint_ref = axis2_endpoint_ref_create(env, subs_mgr_url); - axis2_options_set_to(options, env, endpoint_ref); - - payload = build_topics_request_om_payload(env); - ret_node = axis2_svc_client_send_receive(svc_client, env, payload); - if (ret_node) - { - topic_list = process_topic_list_node(env, ret_node); - } - else - { - AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, - "[savan] Stub invoke FAILED: Error code:" - " %d :: %s", env->error->error_number, - AXIS2_ERROR_GET_MESSAGE(env->error)); - } - if(!s_client && svc_client) - { - /*axis2_svc_client_free(svc_client, env);*/ - } - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, - "[savan] Exit:savan_util_get_topic_list_from_remote_subs_mgr"); - return topic_list; -} - -static axiom_node_t * -build_topics_request_om_payload( - const axutil_env_t *env) -{ - axiom_node_t *om_node = NULL; - axiom_element_t* om_ele = NULL; - axiom_namespace_t *ns1 = NULL; - axis2_char_t *om_str = NULL; - - ns1 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - om_ele = axiom_element_create(env, NULL, ELEM_NAME_GET_TOPIC_LIST, ns1, &om_node); - om_str = axiom_node_to_string(om_node, env); - if (om_str) - { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, - "[savan] Sending topics_request_om_payload: %s", om_str); - AXIS2_FREE(env->allocator, om_str); - om_str = NULL; - } - return om_node; -} - -static axutil_array_list_t * -process_topic_list_node( - const axutil_env_t *env, - axiom_node_t *topic_list_node) -{ - axiom_element_t *topic_list_element = NULL; - axiom_children_qname_iterator_t *topic_iter = NULL; - axutil_qname_t *qname = NULL; - axutil_array_list_t *topic_list = NULL; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:process_topic_list_node"); - - topic_list_element = axiom_node_get_data_element(topic_list_node, env); - - /* Get topic elements from topic list */ - qname = axutil_qname_create(env, ELEM_NAME_TOPIC, SAVAN_NAMESPACE, NULL); - topic_iter = axiom_element_get_children_with_qname(topic_list_element, env, qname, - topic_list_node); - - axutil_qname_free(qname, env); - if(!topic_iter) - { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Topic list is empty"); - return NULL; - } - - if(axiom_children_qname_iterator_has_next(topic_iter, env)) - { - topic_list = axutil_array_list_create(env, 0); - } - - while(axiom_children_qname_iterator_has_next(topic_iter, env)) - { - axiom_node_t *topic_node = NULL; - axiom_element_t *topic_elem = NULL; - axis2_char_t *topic_url_str = NULL; - - topic_node = axiom_children_qname_iterator_next(topic_iter, env); - if(topic_node) - { - topic_elem = axiom_node_get_data_element(topic_node, env); - topic_url_str = axiom_element_get_text(topic_elem, env, topic_node); - axutil_array_list_add(topic_list, env, axutil_strdup(env, topic_url_str)); - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "topic_url_str:%s", topic_url_str); - } - } - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:process_topic_list_node"); - return topic_list; -} - void *AXIS2_CALL savan_util_get_svc_client( const axutil_env_t *env) @@ -1245,11 +673,36 @@ } AXIS2_EXTERN axis2_char_t *AXIS2_CALL -savan_util_get_dbname( +savan_util_get_resource_connection_string( + const axutil_env_t *env, + axis2_conf_t *conf) +{ + axis2_char_t *resource_str = "./savan_db"; + axis2_module_desc_t *module_desc = NULL; + axutil_qname_t *qname = NULL; + + qname = axutil_qname_create(env, SAVAN_MODULE, NULL, NULL); + module_desc = axis2_conf_get_module(conf, env, qname); + if(module_desc) + { + axutil_param_t *resource_param = NULL; + resource_param = axis2_module_desc_get_param(module_desc, env, SAVAN_RESOURCE); + if(resource_param) + { + resource_str = (axis2_char_t *) axutil_param_get_value(resource_param, env); + } + } + axutil_qname_free(qname, env); + + return resource_str; +} + +AXIS2_EXTERN axis2_char_t *AXIS2_CALL +savan_util_get_resource_username( const axutil_env_t *env, axis2_conf_t *conf) { - axis2_char_t *path = "./savan_db"; + axis2_char_t *username = "admin"; axis2_module_desc_t *module_desc = NULL; axutil_qname_t *qname = NULL; @@ -1257,16 +710,41 @@ module_desc = axis2_conf_get_module(conf, env, qname); if(module_desc) { - axutil_param_t *db_param = NULL; - db_param = axis2_module_desc_get_param(module_desc, env, SAVAN_DB); - if(db_param) + axutil_param_t *param = NULL; + param = axis2_module_desc_get_param(module_desc, env, SAVAN_RESOURCE_USERNAME); + if(param) { - path = (axis2_char_t *) axutil_param_get_value(db_param, env); + username = (axis2_char_t *) axutil_param_get_value(param, env); } } axutil_qname_free(qname, env); - return path; + return username; +} + +AXIS2_EXTERN axis2_char_t *AXIS2_CALL +savan_util_get_resource_password( + const axutil_env_t *env, + axis2_conf_t *conf) +{ + axis2_char_t *password = "password"; + axis2_module_desc_t *module_desc = NULL; + axutil_qname_t *qname = NULL; + + qname = axutil_qname_create(env, SAVAN_MODULE, NULL, NULL); + module_desc = axis2_conf_get_module(conf, env, qname); + if(module_desc) + { + axutil_param_t *param = NULL; + param = axis2_module_desc_get_param(module_desc, env, SAVAN_RESOURCE_PASSWORD); + if(param) + { + password = (axis2_char_t *) axutil_param_get_value(param, env); + } + } + axutil_qname_free(qname, env); + + return password; } AXIS2_EXTERN axis2_char_t *AXIS2_CALL @@ -1450,7 +928,10 @@ } endpoint_ref = savan_subscriber_get_end_to(subscriber, env); - endto = (axis2_char_t *) axis2_endpoint_ref_get_address(endpoint_ref, env); + if(endpoint_ref) + { + endto = (axis2_char_t *) axis2_endpoint_ref_get_address(endpoint_ref, env); + } endpoint_ref = savan_subscriber_get_notify_to(subscriber, env); notify = (axis2_char_t *) axis2_endpoint_ref_get_address(endpoint_ref, env); filter = savan_subscriber_get_filter(subscriber, env); @@ -1462,9 +943,12 @@ sub_elem = axiom_element_create(env, parent_node, ELEM_NAME_SUBSCRIBE, ns, &sub_node); /* EndTo element */ - endto_elem = axiom_element_create(env, sub_node, ELEM_NAME_ENDTO, ns, &endto_node); - axiom_element_set_text(endto_elem, env, endto, endto_node); - + if(endto) + { + endto_elem = axiom_element_create(env, sub_node, ELEM_NAME_ENDTO, ns, &endto_node); + axiom_element_set_text(endto_elem, env, endto, endto_node); + } + /* Delivery element */ delivery_elem = axiom_element_create(env, sub_node, ELEM_NAME_DELIVERY, ns, &delivery_node); @@ -1472,8 +956,12 @@ axiom_element_set_text(notify_elem, env, notify, notify_node); /* Expires element */ - expires_elem = axiom_element_create(env, sub_node, ELEM_NAME_EXPIRES, ns, &expires_node); - axiom_element_set_text(expires_elem, env, expires, expires_node); + if(expires) + { + expires_elem = axiom_element_create(env, sub_node, ELEM_NAME_EXPIRES, ns, &expires_node); + axiom_element_set_text(expires_elem, env, expires, expires_node); + } + /* Filter element */ filter_elem = axiom_element_create(env, sub_node, ELEM_NAME_FILTER, ns, &filter_node); axiom_element_set_text(filter_elem, env, filter, filter_node); @@ -1511,7 +999,7 @@ axiom_element_t* id_elem = NULL; axiom_element_t* topic_elem = NULL; axis2_char_t *id = NULL; - axis2_char_t *topic_url = NULL; + axis2_char_t *topic_name = NULL; AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_create_savan_specific_subscriber_node"); @@ -1543,10 +1031,10 @@ axiom_element_set_text(id_elem, env, id, id_node); /* Topic Url element */ - topic_url = savan_subscriber_get_topic_url(subscriber, env); + topic_name = savan_subscriber_get_filter(subscriber, env); ns3 = axiom_namespace_create (env, SAVAN_NAMESPACE, SAVAN_NS_PREFIX); - topic_elem = axiom_element_create(env, subs_node, ELEM_NAME_TOPIC, ns3, &topic_node); - axiom_element_set_text(topic_elem, env, topic_url, topic_node); + topic_elem = axiom_element_create(env, subs_node, ELEM_NAME_FILTER, ns3, &topic_node); + axiom_element_set_text(topic_elem, env, topic_name, topic_node); sub_node = savan_util_create_subscriber_node(env, subscriber, subs_node); if(!sub_node) @@ -1560,46 +1048,51 @@ return subs_node; } -AXIS2_EXTERN axis2_status_t AXIS2_CALL -savan_util_populate_topic( +AXIS2_EXTERN savan_storage_mgr_t * AXIS2_CALL +savan_util_get_storage_mgr( const axutil_env_t *env, - axis2_char_t *topic_url, + axis2_conf_ctx_t *conf_ctx, axis2_conf_t *conf) { - axis2_status_t status = AXIS2_SUCCESS; - axis2_char_t *dbname = NULL; - axutil_array_list_t *topic_store = NULL; - axis2_char_t sql_retrieve[256]; - int size = 0; - - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Entry:savan_util_populate_topic"); - - dbname = savan_util_get_dbname(env, conf), - - sprintf(sql_retrieve, "select topic_name from topic;"); + axutil_property_t *storage_mgr_prop = NULL; + savan_storage_mgr_t *storage_mgr = NULL; - topic_store = savan_db_mgr_retrieve_all(env, savan_util_get_dbname(env, conf), - savan_db_mgr_topic_find_callback, sql_retrieve); - - size = axutil_array_list_size(topic_store, env); - if(size == 0) + if(conf_ctx) { - axis2_char_t *topic_name = NULL; - - topic_name = savan_util_get_topic_name_from_topic_url(env, topic_url); - status = savan_db_mgr_insert_topic(env, dbname, topic_name, topic_url); - if(status == AXIS2_SUCCESS) + storage_mgr_prop = axis2_conf_ctx_get_property(conf_ctx, env, SAVAN_STORAGE_MANAGER); + if(storage_mgr_prop) { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Topic %s added", topic_url); + storage_mgr = (savan_storage_mgr_t *) axutil_property_get_value(storage_mgr_prop, env); } - else + } + + if(!storage_mgr) + { + storage_mgr = savan_storage_mgr_create(env, conf); + + if(storage_mgr && conf_ctx) { - AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[savan] Topic %s could not be added", topic_url); + storage_mgr_prop = axutil_property_create_with_args(env, 0, 0, 0, storage_mgr); + axis2_conf_ctx_set_property(conf_ctx, env, SAVAN_STORAGE_MANAGER, storage_mgr_prop); } } - AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, "[savan] Exit:savan_util_populate_topic"); + return storage_mgr; +} + +axis2_bool_t AXIS2_CALL +savan_util_is_valid_duration( + const axutil_env_t *env, + const axis2_char_t *duration) +{ + return AXIS2_TRUE; +} - return status; +axis2_bool_t AXIS2_CALL +savan_util_is_valid_date_time( + const axutil_env_t *env, + const axis2_char_t *duration) +{ + return AXIS2_TRUE; }
