Modified: webservices/sandesha/trunk/c/src/workers/in_order_invoker.c URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/in_order_invoker.c?rev=433159&r1=433158&r2=433159&view=diff ============================================================================== --- webservices/sandesha/trunk/c/src/workers/in_order_invoker.c (original) +++ webservices/sandesha/trunk/c/src/workers/in_order_invoker.c Sun Aug 20 22:55:38 2006 @@ -184,7 +184,6 @@ AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE); invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker); - axis2_thread_mutex_lock(invoker_impl->mutex); for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(invoker_impl->working_seqs, env); i++) { axis2_char_t *tmp_id = NULL; @@ -197,7 +196,6 @@ } if(0 == AXIS2_ARRAY_LIST_SIZE(invoker_impl->working_seqs, env)) invoker_impl->run_invoker = AXIS2_FALSE; - axis2_thread_mutex_unlock(invoker_impl->mutex); return AXIS2_SUCCESS; } @@ -209,9 +207,7 @@ AXIS2_ENV_CHECK(env, AXIS2_FAILURE); invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker); - axis2_thread_mutex_lock(invoker_impl->mutex); SANDESHA2_INTF_TO_IMPL(invoker)->run_invoker = AXIS2_FALSE; - axis2_thread_mutex_unlock(invoker_impl->mutex); return AXIS2_SUCCESS; } @@ -226,9 +222,7 @@ AXIS2_ENV_CHECK(env, AXIS2_FALSE); invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker); - axis2_thread_mutex_lock(invoker_impl->mutex); started = SANDESHA2_INTF_TO_IMPL(invoker)->run_invoker; - axis2_thread_mutex_unlock(invoker_impl->mutex); return started; } @@ -244,7 +238,6 @@ AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE); invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker); - axis2_thread_mutex_lock(invoker_impl->mutex); if(AXIS2_FALSE == sandesha2_utils_array_list_contains(env, invoker_impl->working_seqs, seq_id)) AXIS2_ARRAY_LIST_ADD(invoker_impl->working_seqs, env, seq_id); @@ -254,7 +247,6 @@ invoker_impl->run_invoker = AXIS2_TRUE; sandesha2_in_order_invoker_run(invoker, env); } - axis2_thread_mutex_unlock(invoker_impl->mutex); return AXIS2_SUCCESS; }
Modified: webservices/sandesha/trunk/c/src/workers/sender.c URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?rev=433159&r1=433158&r2=433159&view=diff ============================================================================== --- webservices/sandesha/trunk/c/src/workers/sender.c (original) +++ webservices/sandesha/trunk/c/src/workers/sender.c Sun Aug 20 22:55:38 2006 @@ -104,7 +104,7 @@ axiom_soap_envelope_t *soap_envelope); -void * AXIS2_THREAD_FUNC +static void * AXIS2_THREAD_FUNC sandesha2_sender_worker_func(axis2_thread_t *thd, void *data); axis2_status_t AXIS2_CALL @@ -204,7 +204,6 @@ AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE); sender_impl = SANDESHA2_INTF_TO_IMPL(sender); - axis2_thread_mutex_lock(sender_impl->mutex); for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env); i++) { axis2_char_t *tmp_id = NULL; @@ -217,7 +216,6 @@ } if(0 == AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env)) sender_impl->run_sender = AXIS2_FALSE; - axis2_thread_mutex_unlock(sender_impl->mutex); return AXIS2_SUCCESS; } @@ -229,9 +227,7 @@ AXIS2_ENV_CHECK(env, AXIS2_FAILURE); sender_impl = SANDESHA2_INTF_TO_IMPL(sender); - axis2_thread_mutex_lock(sender_impl->mutex); SANDESHA2_INTF_TO_IMPL(sender)->run_sender = AXIS2_FALSE; - axis2_thread_mutex_unlock(sender_impl->mutex); return AXIS2_SUCCESS; } @@ -240,16 +236,12 @@ (sandesha2_sender_t *sender, const axis2_env_t *env) { - axis2_bool_t started = AXIS2_FALSE; sandesha2_sender_impl_t *sender_impl = NULL; AXIS2_ENV_CHECK(env, AXIS2_FALSE); sender_impl = SANDESHA2_INTF_TO_IMPL(sender); - axis2_thread_mutex_lock(sender_impl->mutex); - started = SANDESHA2_INTF_TO_IMPL(sender)->run_sender; - axis2_thread_mutex_unlock(sender_impl->mutex); - return started; + sender_impl->run_sender; } axis2_status_t AXIS2_CALL @@ -261,11 +253,11 @@ sandesha2_sender_impl_t *sender_impl = NULL; AXIS2_ENV_CHECK(env, AXIS2_FAILURE); AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE); - AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE); sender_impl = SANDESHA2_INTF_TO_IMPL(sender); + axis2_thread_mutex_lock(sender_impl->mutex); - if(AXIS2_FALSE == sandesha2_utils_array_list_contains(env, + if(seq_id && AXIS2_FALSE == sandesha2_utils_array_list_contains(env, sender_impl->working_seqs, seq_id)) AXIS2_ARRAY_LIST_ADD(sender_impl->working_seqs, env, seq_id); if(AXIS2_FALSE == sender_impl->run_sender) @@ -279,12 +271,14 @@ } axis2_status_t AXIS2_CALL -sandesha2_sender_run (sandesha2_sender_t *sender, - const axis2_env_t *env) +sandesha2_sender_run ( + sandesha2_sender_t *sender, + const axis2_env_t *env) { sandesha2_sender_impl_t *sender_impl = NULL; axis2_thread_t *worker_thread = NULL; sandesha2_sender_args_t *args = NULL; + AXIS2_ENV_CHECK(env, AXIS2_FAILURE); sender_impl = SANDESHA2_INTF_TO_IMPL(sender); @@ -292,6 +286,11 @@ args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_sender_args_t)); args->impl = sender_impl; args->env = (axis2_env_t*)env; + /* Assigning like theis cause seg faults when trying to allocate memory + * inside worker function. So created new env */ + /*args->env->allocator = env->allocator;*/ + args->env = axis2_env_create(env->allocator); + worker_thread = AXIS2_THREAD_POOL_GET_THREAD(env->thread_pool, sandesha2_sender_worker_func, (void*)args); if(NULL == worker_thread) @@ -319,7 +318,6 @@ AXIS2_ENV_CHECK(env, AXIS2_FAILURE); AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE); - property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN, AXIS2_FALSE); if(NULL == property) @@ -437,8 +435,10 @@ /** * Thread worker function. */ -void * AXIS2_THREAD_FUNC -sandesha2_sender_worker_func(axis2_thread_t *thd, void *data) +static void * AXIS2_THREAD_FUNC +sandesha2_sender_worker_func( + axis2_thread_t *thd, + void *data) { sandesha2_sender_impl_t *sender_impl = NULL; sandesha2_sender_t *sender = NULL; @@ -462,6 +462,7 @@ axis2_bool_t rollbacked = AXIS2_FALSE;*/ sandesha2_sender_mgr_t *mgr = NULL; sandesha2_sender_bean_t *sender_bean = NULL; + sandesha2_sender_bean_t *bean1 = NULL; sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL; axis2_char_t *key = NULL; axis2_msg_ctx_t *msg_ctx = NULL; @@ -475,18 +476,20 @@ axis2_transport_out_desc_t *transport_out = NULL; axis2_transport_sender_t *transport_sender = NULL; axis2_bool_t successfully_sent = AXIS2_FALSE; - sandesha2_sender_bean_t *bean1 = NULL; - + axis2_char_t *msg_id = NULL; + sleep(1); transaction = SANDESHA2_STORAGE_MGR_GET_TRANSACTION(storage_mgr, env); mgr = SANDESHA2_STORAGE_MGR_GET_RETRANS_MGR(storage_mgr, env); seq_prop_mgr = SANDESHA2_STORAGE_MGR_GET_SEQ_PROPERTY_MGR( storage_mgr, env); - sender_bean = SANDESHA2_SENDER_MGR_GET_NEXT_MSG_TO_SEND(mgr, env); if(NULL == sender_bean) + { + printf("sender_bean is NULL\n"); continue; + } key = SANDESHA2_SENDER_BEAN_GET_MSG_CONTEXT_REF_KEY(sender_bean, env); msg_ctx = SANDESHA2_STORAGE_MGR_RETRIEVE_MSG_CTX(storage_mgr, env, key, @@ -506,7 +509,10 @@ continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean, sender_impl->conf_ctx, storage_mgr); if(AXIS2_FALSE == continue_sending) + { + printf("continue_sending true\n"); continue; + } property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, SANDESHA2_QUALIFIED_FOR_SENDING, AXIS2_FALSE); @@ -526,27 +532,49 @@ if(NULL != msgs_not_to_send) { int j = 0; + axis2_bool_t continue_sending = AXIS2_FALSE; + for(j = 0; j < AXIS2_ARRAY_LIST_SIZE(msgs_not_to_send, env); j++) { axis2_char_t *value = NULL; int int_val = -1; + int msg_type = -1; value = AXIS2_ARRAY_LIST_GET(msgs_not_to_send, env, j); int_val = atoi(value); - if(SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env) == int_val) - continue_sending = AXIS2_FALSE; + msg_type = SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env); + if(msg_type == int_val) + continue_sending = AXIS2_TRUE; } - if(AXIS2_FALSE == continue_sending) + if(AXIS2_TRUE == continue_sending) continue; } + /* + * This method is not implemented yet + * update_msg(sender, env, msg_xtx); + */ msg_type = SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env); + if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION) + { + sandesha2_seq_t *seq = NULL; + axis2_char_t *seq_id = NULL; + sandesha2_identifier_t *identifier = NULL; + + seq = (sandesha2_seq_t*) + SANDESHA2_MSG_CTX_GET_MSG_PART(rm_msg_ctx, + env, SANDESHA2_MSG_PART_SEQ); + identifier = SANDESHA2_SEQ_GET_IDENTIFIER(seq, env); + seq_id = SANDESHA2_IDENTIFIER_GET_IDENTIFIER(identifier, env); + } if(AXIS2_TRUE == sandesha2_sender_is_piggybackable_msg_type(sender, env, msg_type) && AXIS2_FALSE == sandesha2_sender_is_ack_already_piggybacked(sender, env, rm_msg_ctx)) + { sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx, storage_mgr); + } transport_out = AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx, env); @@ -574,16 +602,34 @@ SANDESHA2_VALUE_TRUE, env)); AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE); - - bean1 = SANDESHA2_SENDER_MGR_RETRIEVE(mgr, env, - SANDESHA2_SENDER_BEAN_GET_MSG_ID(sender_bean, env)); + msg_id = SANDESHA2_SENDER_BEAN_GET_MSG_ID(sender_bean, env); + bean1 = SANDESHA2_SENDER_MGR_RETRIEVE(mgr, env, msg_id); if(NULL != bean1) { - SANDESHA2_SENDER_BEAN_SET_SENT_COUNT(bean1, env, + axis2_bool_t resend = AXIS2_FALSE; + + resend = SANDESHA2_SENDER_BEAN_IS_RESEND(sender_bean, env); + printf("resend:%d\n", resend); + if(AXIS2_TRUE == resend) + { + SANDESHA2_SENDER_BEAN_SET_SENT_COUNT(bean1, env, SANDESHA2_SENDER_BEAN_GET_SENT_COUNT(sender_bean, env)); - SANDESHA2_SENDER_BEAN_SET_TIME_TO_SEND(bean1, env, + SANDESHA2_SENDER_BEAN_SET_TIME_TO_SEND(bean1, env, SANDESHA2_SENDER_BEAN_GET_TIME_TO_SEND(sender_bean, env)); - SANDESHA2_SENDER_BEAN_MGR_UPDATE(mgr, env, bean1); + SANDESHA2_SENDER_MGR_UPDATE(mgr, env, bean1); + } + else + { + axis2_char_t *msg_stored_key = NULL; + + msg_id = SANDESHA2_SENDER_BEAN_GET_MSG_ID(bean1, env); + SANDESHA2_SENDER_MGR_REMOVE(mgr, env, msg_id); + /* Removing the message from the storage */ + msg_stored_key = SANDESHA2_SENDER_BEAN_GET_MSG_CONTEXT_REF_KEY( + bean1, env); + SANDESHA2_STORAGE_MGR_REMOVE_MSG_CTX(storage_mgr, env, + msg_stored_key); + } } if(AXIS2_TRUE == successfully_sent) { @@ -618,8 +664,12 @@ AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE); /* TODO make transaction handling effective */ - SANDESHA2_TRANSACTION_COMMIT(transaction, env); + if(transaction) + { + SANDESHA2_TRANSACTION_COMMIT(transaction, env); + } } + axis2_env_free(env); return NULL; } Modified: webservices/sandesha/trunk/c/src/wsrm/identifier.c URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/wsrm/identifier.c?rev=433159&r1=433158&r2=433159&view=diff ============================================================================== --- webservices/sandesha/trunk/c/src/wsrm/identifier.c (original) +++ webservices/sandesha/trunk/c/src/wsrm/identifier.c Sun Aug 20 22:55:38 2006 @@ -39,17 +39,19 @@ const axis2_env_t *env); void* AXIS2_CALL -sandesha2_identifier_from_om_node(sandesha2_iom_rm_element_t *identifier, - const axis2_env_t *env, axiom_node_t *om_node); +sandesha2_identifier_from_om_node( + sandesha2_iom_rm_element_t *identifier, + const axis2_env_t *env, axiom_node_t *om_node); axiom_node_t* AXIS2_CALL -sandesha2_identifier_to_om_node(sandesha2_iom_rm_element_t *identifier, - const axis2_env_t *env, void *om_node); +sandesha2_identifier_to_om_node( + sandesha2_iom_rm_element_t *identifier, + const axis2_env_t *env, void *om_node); axis2_bool_t AXIS2_CALL sandesha2_identifier_is_namespace_supported( - sandesha2_iom_rm_element_t *identifier, - const axis2_env_t *env, axis2_char_t *namespace); + sandesha2_iom_rm_element_t *identifier, + const axis2_env_t *env, axis2_char_t *namespace); axis2_char_t * AXIS2_CALL sandesha2_identifier_get_identifier(sandesha2_identifier_t *identifier, @@ -66,20 +68,14 @@ /***************************** End of function headers ************************/ AXIS2_EXTERN sandesha2_identifier_t* AXIS2_CALL -sandesha2_identifier_create(const axis2_env_t *env, axis2_char_t *ns_val) +sandesha2_identifier_create( + const axis2_env_t *env, + axis2_char_t *ns_val) { sandesha2_identifier_impl_t *identifier_impl = NULL; AXIS2_ENV_CHECK(env, NULL); AXIS2_PARAM_CHECK(env->error, ns_val, NULL); - - if(AXIS2_FALSE == sandesha2_identifier_is_namespace_supported( - (sandesha2_iom_rm_element_t*)identifier_impl, env, - ns_val)) - { - AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_UNSUPPORTED_NS, - AXIS2_FAILURE); - return NULL; - } + identifier_impl = (sandesha2_identifier_impl_t *)AXIS2_MALLOC (env->allocator, sizeof(sandesha2_identifier_impl_t)); @@ -93,9 +89,17 @@ identifier_impl->identifier.ops = NULL; identifier_impl->identifier.element.ops = NULL; + if(AXIS2_FALSE == sandesha2_identifier_is_namespace_supported( + (sandesha2_iom_rm_element_t*)identifier_impl, env, + ns_val)) + { + AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_UNSUPPORTED_NS, + AXIS2_FAILURE); + return NULL; + } identifier_impl->identifier.ops = AXIS2_MALLOC(env->allocator, - sizeof(sandesha2_iom_rm_element_ops_t)); + sizeof(sandesha2_identifier_ops_t)); if(NULL == identifier_impl->identifier.ops) { sandesha2_identifier_free((sandesha2_iom_rm_element_t*) @@ -104,7 +108,7 @@ return NULL; } identifier_impl->identifier.element.ops = AXIS2_MALLOC(env->allocator, - sizeof(sandesha2_identifier_ops_t)); + sizeof(sandesha2_iom_rm_element_ops_t)); if(NULL == identifier_impl->identifier.element.ops) { sandesha2_identifier_free((sandesha2_iom_rm_element_t*) @@ -178,8 +182,9 @@ void* AXIS2_CALL -sandesha2_identifier_from_om_node(sandesha2_iom_rm_element_t *identifier, - const axis2_env_t *env, axiom_node_t *om_node) +sandesha2_identifier_from_om_node( + sandesha2_iom_rm_element_t *identifier, + const axis2_env_t *env, axiom_node_t *om_node) { sandesha2_identifier_impl_t *identifier_impl = NULL; axiom_element_t *om_element = NULL; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
