Repository: qpid-dispatch Updated Branches: refs/heads/master 1ee98d495 -> 113228490
DISPATCH-479 - Use atomic ops for ref_counts [Patch from Ulf Lilleengen] This closes #96 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/11322849 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/11322849 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/11322849 Branch: refs/heads/master Commit: 113228490fcae32c9f1b6a898147e4f0ff0fe504 Parents: 1ee98d4 Author: Ted Ross <tr...@redhat.com> Authored: Fri Sep 16 12:14:32 2016 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Fri Sep 16 12:14:32 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/atomic.h | 144 +++++++++++++++++++++++++++++ src/connection_manager.c | 43 ++++----- src/message.c | 10 +- src/message_private.h | 3 +- src/router_core/forwarder.c | 4 +- src/router_core/router_core_private.h | 3 +- src/router_core/transfer.c | 22 ++--- 7 files changed, 178 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/include/qpid/dispatch/atomic.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h new file mode 100644 index 0000000..567b722 --- /dev/null +++ b/include/qpid/dispatch/atomic.h @@ -0,0 +1,144 @@ +#ifndef __sys_atomic_h__ +#define __sys_atomic_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/**@file + * Portable atomic operations on uint32_t. + */ + +#include <stdint.h> + +/****************************************************************************** + * C11 atomics * + ******************************************************************************/ +#if defined(__STDC__) && (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__) + +#include <stdatomic.h> +typedef atomic_uint sys_atomic_t; + +static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value) +{ + atomic_store(ref, value); +} + +static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value) +{ + return atomic_fetch_add(ref, value); +} + +static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value) +{ + return atomic_fetch_sub(ref, value); +} + +static inline uint32_t sys_atomic_get(sys_atomic_t *ref) +{ + return atomic_load(ref); +} + +static inline void sys_atomic_destroy(sys_atomic_t *ref) {} + +#elif defined(__GNUC__) || defined(__clang__) + +/****************************************************************************** + * GCC specific atomics * + ******************************************************************************/ + +typedef volatile uint32_t sys_atomic_t; + +static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value) +{ + *ref = value; +} + +static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value) +{ + return __sync_fetch_and_add(ref, value); +} + +static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value) +{ + return __sync_fetch_and_sub(ref, value); +} + +static inline uint32_t sys_atomic_get(sys_atomic_t *ref) +{ + return *ref; +} + +static inline void sys_atomic_destroy(sys_atomic_t *ref) {} + +#else + +/****************************************************************************** + * Mutex fallback atomics * + ******************************************************************************/ +#include <qpid/dispatch/threading.h> + +struct sys_atomic_t { + sys_mutex_t *lock; + uint32_t value; +}; +typedef struct sys_atomic_t sys_atomic_t; + +static inline void sys_atomic_init(sys_atomic_t *ref, uint32_t value) +{ + ref->lock = sys_mutex(); + ref->value = value; +} + +static inline uint32_t sys_atomic_add(sys_atomic_t *ref, uint32_t value) +{ + sys_mutex_lock(ref->lock); + uint32_t prev = ref->value; + ref->value += value; + sys_mutex_unlock(ref->lock); + return prev; +} + +static inline uint32_t sys_atomic_sub(sys_atomic_t *ref, uint32_t value) +{ + sys_mutex_lock(ref->lock); + uint32_t prev = ref->value; + ref->value -= value; + sys_mutex_unlock(ref->lock); + return prev; +} + +static inline uint32_t sys_atomic_get(sys_atomic_t *ref) +{ + sys_mutex_lock(ref->lock); + uint32_t value = ref->value; + sys_mutex_unlock(ref->lock); + return value; +} + +static inline void sys_atomic_destroy(sys_atomic_t *ref) +{ + sys_mutex_lock(ref->lock); + sys_mutex_free(ref->lock); +} + +#endif + +#define sys_atomic_inc(ref) sys_atomic_add((ref), 1) +#define sys_atomic_dec(ref) sys_atomic_sub((ref), 1) + +#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/connection_manager.c ---------------------------------------------------------------------- diff --git a/src/connection_manager.c b/src/connection_manager.c index bdea6aa..a779fd6 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -20,6 +20,7 @@ #include <qpid/dispatch/connection_manager.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/threading.h> +#include <qpid/dispatch/atomic.h> #include "dispatch_private.h" #include "connection_manager_private.h" #include "server_private.h" @@ -33,16 +34,16 @@ static char* HOST_ADDR_DEFAULT = "127.0.0.1"; struct qd_config_ssl_profile_t { DEQ_LINKS(qd_config_ssl_profile_t); - uint64_t identity; - char *name; - char *ssl_password; - char *ssl_trusted_certificate_db; - char *ssl_trusted_certificates; - char *ssl_uid_format; - char *ssl_display_name_file; - char *ssl_certificate_file; - char *ssl_private_key_file; - int ref_count; + uint64_t identity; + char *name; + char *ssl_password; + char *ssl_trusted_certificate_db; + char *ssl_trusted_certificates; + char *ssl_uid_format; + char *ssl_display_name_file; + char *ssl_certificate_file; + char *ssl_private_key_file; + sys_atomic_t ref_count; }; struct qd_config_listener_t { @@ -224,9 +225,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf config->ssl_uid_format = (*ssl_profile)->ssl_uid_format; config->ssl_display_name_file = (*ssl_profile)->ssl_display_name_file; } - sys_mutex_lock(qd->connection_manager->ssl_profile_lock); - (*ssl_profile)->ref_count++; - sys_mutex_unlock(qd->connection_manager->ssl_profile_lock); + sys_atomic_inc(&(*ssl_profile)->ref_count); } free(stripAnnotations); @@ -254,9 +253,8 @@ qd_config_ssl_profile_t *qd_dispatch_configure_ssl_profile(qd_dispatch_t *qd, qd ssl_profile->ssl_trusted_certificates = qd_entity_opt_string(entity, "trustedCerts", 0); CHECK(); ssl_profile->ssl_uid_format = qd_entity_opt_string(entity, "uidFormat", 0); CHECK(); ssl_profile->ssl_display_name_file = qd_entity_opt_string(entity, "displayNameFile", 0); CHECK(); - sys_mutex_lock(qd->connection_manager->ssl_profile_lock); - ssl_profile->ref_count = 0; - sys_mutex_unlock(qd->connection_manager->ssl_profile_lock); + + sys_atomic_init(&ssl_profile->ref_count, 0); qd_log(cm->log_source, QD_LOG_INFO, "Created SSL Profile with name %s ", ssl_profile->name); return ssl_profile; @@ -413,9 +411,7 @@ void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t qd_server_connector_free(cc->connector); if (cc->ssl_profile) { - sys_mutex_lock(cm->ssl_profile_lock); - cc->ssl_profile->ref_count--; - sys_mutex_unlock(cm->ssl_profile_lock); + sys_atomic_dec(&cc->ssl_profile->ref_count); } free(cc); @@ -431,9 +427,7 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t * } if (cl->ssl_profile) { - sys_mutex_lock(cm->ssl_profile_lock); - cl->ssl_profile->ref_count--; - sys_mutex_unlock(cm->ssl_profile_lock); + sys_atomic_dec(&cl->ssl_profile->ref_count); } free(cl); @@ -442,12 +436,9 @@ void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t * bool qd_config_ssl_profile_free(qd_connection_manager_t *cm, qd_config_ssl_profile_t *ssl_profile) { - sys_mutex_lock(cm->ssl_profile_lock); - if (ssl_profile->ref_count != 0) { - sys_mutex_unlock(cm->ssl_profile_lock); + if (sys_atomic_get(&ssl_profile->ref_count) != 0) { return false; } - sys_mutex_unlock(cm->ssl_profile_lock); DEQ_REMOVE(cm->config_ssl_profiles, ssl_profile); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index 99fcae8..ab16e9d 100644 --- a/src/message.c +++ b/src/message.c @@ -561,7 +561,7 @@ qd_message_t *qd_message() memset(msg->content, 0, sizeof(qd_message_content_t)); msg->content->lock = sys_mutex(); - msg->content->ref_count = 1; + sys_atomic_init(&msg->content->ref_count, 1); msg->content->parse_depth = QD_DEPTH_NONE; msg->content->parsed_message_annotations = 0; @@ -581,9 +581,7 @@ void qd_message_free(qd_message_t *in_msg) qd_message_content_t *content = msg->content; - sys_mutex_lock(content->lock); - rc = --content->ref_count; - sys_mutex_unlock(content->lock); + rc = sys_atomic_dec(&content->ref_count) - 1; if (rc == 0) { if (content->parsed_message_annotations) @@ -621,9 +619,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) copy->content = content; - sys_mutex_lock(content->lock); - content->ref_count++; - sys_mutex_unlock(content->lock); + sys_atomic_inc(&content->ref_count); return (qd_message_t*) copy; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index 8ede2c7..0387302 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -22,6 +22,7 @@ #include <qpid/dispatch/message.h> #include "alloc.h" #include <qpid/dispatch/threading.h> +#include <qpid/dispatch/atomic.h> /** @file * Message representation. @@ -65,7 +66,7 @@ typedef struct { typedef struct { sys_mutex_t *lock; - uint32_t ref_count; // The number of messages referencing this + sys_atomic_t ref_count; // The number of messages referencing this qd_buffer_list_t buffers; // The buffer chain containing the message qd_field_location_t section_message_header; // The message header list qd_field_location_t section_delivery_annotation; // The delivery annotation map http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index edd540d..1f4031b 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -118,7 +118,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in dlv->peer = in_dlv; in_dlv->peer = dlv; - dlv->ref_count = 1; + sys_atomic_init(&dlv->ref_count, 1); qdr_delivery_incref(in_dlv); } } @@ -162,7 +162,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t * DEQ_INSERT_TAIL(link->undelivered, dlv); dlv->where = QDR_DELIVERY_IN_UNDELIVERED; - dlv->ref_count++; // We have the lock, don't use the incref function + sys_atomic_inc(&dlv->ref_count); // // If the link isn't already on the links_with_deliveries list, put it there. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 594e200..02568ce 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -22,6 +22,7 @@ #include "dispatch_private.h" #include <qpid/dispatch/router_core.h> #include <qpid/dispatch/threading.h> +#include <qpid/dispatch/atomic.h> #include <qpid/dispatch/log.h> #include <memory.h> @@ -201,7 +202,7 @@ typedef enum { struct qdr_delivery_t { DEQ_LINKS(qdr_delivery_t); void *context; - int ref_count; + sys_atomic_t ref_count; qdr_link_t *link; qdr_delivery_t *peer; qd_message_t *msg; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/11322849/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index b1348a1..9a30cdf 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -44,7 +44,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->ref_count = 1; // referenced by the action + sys_atomic_init(&dlv->ref_count, 1); // referenced by the action dlv->link = link; dlv->msg = msg; dlv->to_addr = 0; @@ -66,7 +66,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->ref_count = 1; // referenced by the action + sys_atomic_init(&dlv->ref_count, 1); // referenced by the action dlv->link = link; dlv->msg = msg; dlv->to_addr = addr; @@ -90,7 +90,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->ref_count = 1; // referenced by the action + sys_atomic_init(&dlv->ref_count, 1); // referenced by the action dlv->link = link; dlv->msg = msg; dlv->settled = settled; @@ -248,15 +248,12 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery) return delivery->context; } - void qdr_delivery_incref(qdr_delivery_t *delivery) { qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0; if (!!conn) { - sys_mutex_lock(conn->work_lock); - delivery->ref_count++; - sys_mutex_unlock(conn->work_lock); + sys_atomic_inc(&delivery->ref_count); } } @@ -267,12 +264,9 @@ static void qdr_delivery_decref_internal(qdr_delivery_t *delivery, bool lock_hel bool delete = false; if (!!conn) { - if (!lock_held) - sys_mutex_lock(conn->work_lock); - assert(delivery->ref_count > 0); - delete = --delivery->ref_count == 0; - if (!lock_held) - sys_mutex_unlock(conn->work_lock); + uint32_t ref_count = sys_atomic_dec(&delivery->ref_count); + assert(ref_count > 0); + delete = (ref_count - 1) == 0; } if (delete) { @@ -805,7 +799,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) sys_mutex_lock(link->conn->work_lock); if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { - dlv->ref_count++; // We have the lock, don't use the incref function + sys_atomic_inc(&dlv->ref_count); qdr_add_delivery_ref(&link->updated_deliveries, dlv); qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY); activate = true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org