Repository: qpid-dispatch Updated Branches: refs/heads/master 118518516 -> 152185eb9
DISPATCH-522 - Reduce the number of times the core thread invokes connection activation during times of high traffic volume. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/152185eb Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/152185eb Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/152185eb Branch: refs/heads/master Commit: 152185eb91d58a924f982191bdf95d954199c47c Parents: 1185185 Author: Ted Ross <tr...@redhat.com> Authored: Wed Sep 21 20:16:10 2016 -0700 Committer: Ted Ross <tr...@redhat.com> Committed: Wed Sep 21 20:16:10 2016 -0700 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 3 ++- include/qpid/dispatch/server.h | 3 ++- src/container.c | 2 +- src/router_core/connections.c | 13 ++++++++++++- src/router_core/router_core_private.h | 5 ++++- src/router_core/router_core_thread.c | 17 +++++++++++++++++ src/router_node.c | 4 ++-- src/server.c | 9 +++++---- 8 files changed, 45 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 8eaa7ef..9421481 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -219,8 +219,9 @@ int qdr_connection_process(qdr_connection_t *conn); * * @param context The context supplied when the callback was registered * @param conn The connection object to be activated + * @param awaken Iff true, awaken the driver poll loop after the activation */ -typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn); +typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn, bool awaken); /** ****************************************************************************** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/include/qpid/dispatch/server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index a0ee4b5..9862895 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -538,8 +538,9 @@ void qd_connection_set_user(qd_connection_t *conn); * internal work list and be invoked for processing by a worker thread. * * @param conn The connection over which the application wishes to send data + * @param awaken Iff true, wakeup the driver poll after the activation */ -void qd_server_activate(qd_connection_t *conn); +void qd_server_activate(qd_connection_t *conn, bool awaken); /** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 92c0e0e..5080258 100644 --- a/src/container.c +++ b/src/container.c @@ -933,7 +933,7 @@ void qd_link_activate(qd_link_t *link) if (!ctx) return; - qd_server_activate(ctx); + qd_server_activate(ctx, true); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 634a07f..1cd054b 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -373,7 +373,10 @@ void qdr_connection_handlers(qdr_core_t *core, void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn) { - core->activate_handler(core->user_context, conn); + if (!conn->in_activate_list) { + DEQ_INSERT_TAIL_N(ACTIVATE, core->connections_to_activate, conn); + conn->in_activate_list = true; + } } @@ -979,6 +982,14 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo work = DEQ_HEAD(conn->work_list); } + // + // If this connection is on the activation list, remove it from the list + // + if (conn->in_activate_list) { + conn->in_activate_list = false; + DEQ_REMOVE_N(ACTIVATE, core->connections_to_activate, conn); + } + DEQ_REMOVE(core->open_connections, conn); sys_mutex_free(conn->work_lock); free_qdr_connection_t(conn); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 2f55eee..9fa86e9 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -434,9 +434,11 @@ DEQ_DECLARE(qdr_connection_work_t, qdr_connection_work_list_t); struct qdr_connection_t { DEQ_LINKS(qdr_connection_t); + DEQ_LINKS_N(ACTIVATE, qdr_connection_t); qdr_core_t *core; void *user_context; bool incoming; + bool in_activate_list; qdr_connection_role_t role; int inter_router_cost; qdr_conn_identifier_t *conn_id; @@ -524,7 +526,9 @@ struct qdr_core_t { qd_timer_t *work_timer; qdr_connection_list_t open_connections; + qdr_connection_list_t connections_to_activate; qdr_link_list_t open_links; + // // Agent section // @@ -587,7 +591,6 @@ struct qdr_core_t { uint64_t next_identifier; sys_mutex_t *id_lock; - qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1]; }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/router_core/router_core_thread.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index b359602..5cffeb9 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -30,6 +30,18 @@ ALLOC_DEFINE(qdr_action_t); +static void qdr_activate_connections_CT(qdr_core_t *core) +{ + qdr_connection_t *conn = DEQ_HEAD(core->connections_to_activate); + while (conn) { + DEQ_REMOVE_HEAD_N(ACTIVATE, core->connections_to_activate); + conn->in_activate_list = false; + core->activate_handler(core->user_context, conn, DEQ_IS_EMPTY(core->connections_to_activate)); + conn = DEQ_HEAD(core->connections_to_activate); + } +} + + void *router_core_thread(void *arg) { qdr_core_t *core = (qdr_core_t*) arg; @@ -72,6 +84,11 @@ void *router_core_thread(void *arg) free_qdr_action_t(action); action = DEQ_HEAD(action_list); } + + // + // Activate all connections that were flagged for activation during the above processing + // + qdr_activate_connections_CT(core); } qd_log(core->log, QD_LOG_INFO, "Router Core thread exited"); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 7b46873..9b6b436 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -712,14 +712,14 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are } -static void CORE_connection_activate(void *context, qdr_connection_t *conn) +static void CORE_connection_activate(void *context, qdr_connection_t *conn, bool awaken) { // // IMPORTANT: This is the only core callback that is invoked on the core // thread itself. It is imperative that this function do nothing // apart from setting the activation in the server for the connection. // - qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn)); + qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn), awaken); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/152185eb/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index e78978d..47cbac2 100644 --- a/src/server.c +++ b/src/server.c @@ -1580,7 +1580,7 @@ void qd_server_resume(qd_dispatch_t *qd) } -void qd_server_activate(qd_connection_t *ctx) +void qd_server_activate(qd_connection_t *ctx, bool awaken) { if (!ctx) return; @@ -1591,7 +1591,8 @@ void qd_server_activate(qd_connection_t *ctx) if (!qdpn_connector_closed(ctor)) { qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE); - qdpn_driver_wakeup(ctx->server->driver); + if (awaken) + qdpn_driver_wakeup(ctx->server->driver); } } @@ -1668,7 +1669,7 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo DEQ_INSERT_TAIL(conn->deferred_calls, dc); sys_mutex_unlock(conn->deferred_call_lock); - qd_server_activate(conn); + qd_server_activate(conn, true); } @@ -1676,7 +1677,7 @@ void qd_connection_set_event_stall(qd_connection_t *conn, bool stall) { conn->event_stall = stall; if (!stall) - qd_server_activate(conn); + qd_server_activate(conn, true); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org