Hi This is the first try of a lockless scheduler, by using two 64-bit variables per worker thread instead of one active_connections variable.
This way the worker thread and the manager thread only write to their own variables, and there is no data race. I have measured the speed to be within variance on a 6-core cpu. - Lauri
>From a02332bf19ea52a75fcf00545211722e1993cb8e Mon Sep 17 00:00:00 2001 From: Lauri Kasanen <[email protected]> Date: Fri, 4 May 2012 19:11:07 +0300 Subject: [PATCH] First try at lockless sched Signed-off-by: Lauri Kasanen <[email protected]> --- src/include/mk_scheduler.h | 2 +- src/include/monkey.h | 3 --- src/mk_scheduler.c | 42 +++++++++++++++++++++++------------------- src/monkey.c | 3 --- 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/include/mk_scheduler.h b/src/include/mk_scheduler.h index 7e74638..661f13b 100644 --- a/src/include/mk_scheduler.h +++ b/src/include/mk_scheduler.h @@ -46,7 +46,7 @@ struct sched_connection /* Global struct */ struct sched_list_node { - unsigned short int active_connections; + unsigned long long closed_connections, accepted_connections; struct mk_list busy_queue; struct mk_list av_queue; diff --git a/src/include/monkey.h b/src/include/monkey.h index 097da93..e3f9b79 100644 --- a/src/include/monkey.h +++ b/src/include/monkey.h @@ -31,9 +31,6 @@ #define SH_NOCGI 0 #define SH_CGI 1 - -/* Thread mutexes */ -pthread_mutex_t mutex_sched_active_connections; mk_pointer mk_monkey_protocol; /* Process UID/GID */ diff --git a/src/mk_scheduler.c b/src/mk_scheduler.c index 6e75ba2..f3ba736 100644 --- a/src/mk_scheduler.c +++ b/src/mk_scheduler.c @@ -43,23 +43,33 @@ #include "mk_macros.h" /* - * Returns the worker id which should take a new incomming connection, - * it returns the worker id with less active connections + * Returns the worker id which should take a new incoming connection, + * the one with the least active connections. */ static inline int _next_target() { int i; int target = 0; + unsigned long long tmp = 0, cur = 0; + + cur = sched_list[0].accepted_connections - sched_list[0].closed_connections; + if (cur == 0) + return 0; /* Finds the lowest load worker */ for (i = 1; i < config->workers; i++) { - if (sched_list[i].active_connections < sched_list[target].active_connections) { + tmp = sched_list[i].accepted_connections - sched_list[i].closed_connections; + if (tmp < cur) { target = i; + cur = tmp; + + if (cur == 0) + break; } } /* If sched_list[target] worker is full then the whole server too, because it has the lowest load. */ - if(sched_list[target].active_connections >= config->worker_capacity) { + if(cur >= config->worker_capacity) { MK_TRACE("Too many clients: %i", config->worker_capacity * config->workers); return -1; } @@ -88,18 +98,11 @@ inline int mk_sched_add_client(int remote_fd) MK_TRACE("[FD %i] Balance to WID %i", remote_fd, sched->idx); - pthread_mutex_lock(&mutex_sched_active_connections); - sched->active_connections += 1; - pthread_mutex_unlock(&mutex_sched_active_connections); - r = mk_epoll_add(sched->epoll_fd, remote_fd, MK_EPOLL_WRITE, MK_EPOLL_LEVEL_TRIGGERED); - /* If epoll has failed, decrement the active connections counter */ - if (r != 0) { - pthread_mutex_lock(&mutex_sched_active_connections); - sched->active_connections -= 1; - pthread_mutex_unlock(&mutex_sched_active_connections); + if (r == 0) { + sched->accepted_connections++; } return r; @@ -115,7 +118,11 @@ int mk_sched_register_client(int remote_fd, struct sched_list_node *sched) struct sched_connection *sched_conn; struct mk_list *av_queue = &sched->av_queue; - if (sched->active_connections < config->worker_capacity) { + unsigned long long active_connections; + + active_connections = sched->accepted_connections - sched->closed_connections; + + if (active_connections < config->worker_capacity) { sched_conn = mk_list_entry_first(av_queue, struct sched_connection, _head); /* Before to continue, we need to run plugin stage 10 */ @@ -215,7 +222,6 @@ int mk_sched_register_thread(int efd) static int wid = 0; sl = &sched_list[wid]; - sl->active_connections = 0; sl->idx = wid++; sl->tid = pthread_self(); @@ -265,7 +271,7 @@ int mk_sched_launch_thread(int max_events) return -1; } - thconf = mk_mem_malloc(sizeof(sched_thread_conf)); + thconf = mk_mem_malloc_z(sizeof(sched_thread_conf)); thconf->epoll_fd = efd; thconf->epoll_max_events = max_events*2; thconf->max_events = max_events; @@ -335,9 +341,7 @@ int mk_sched_remove_client(struct sched_list_node *sched, int remote_fd) /* Invoke plugins in stage 50 */ mk_plugin_stage_run(MK_PLUGIN_STAGE_50, remote_fd, NULL, NULL, NULL); - pthread_mutex_lock(&mutex_sched_active_connections); - sched->active_connections -= 1; - pthread_mutex_unlock(&mutex_sched_active_connections); + sched->closed_connections++; /* Change node status */ sc->status = MK_SCHEDULER_CONN_AVAILABLE; diff --git a/src/monkey.c b/src/monkey.c index 1c70257..630d23d 100644 --- a/src/monkey.c +++ b/src/monkey.c @@ -153,9 +153,6 @@ int main(int argc, char **argv) mk_sched_init(); mk_plugin_init(); - /* FIXME: temporal mutex */ - pthread_mutex_init(&mutex_sched_active_connections, (pthread_mutexattr_t *) NULL); - /* Server listening socket */ config->server_fd = mk_socket_server(config->serverport, config->listen_addr); -- 1.7.2.1
_______________________________________________ Monkey mailing list [email protected] http://lists.monkey-project.com/listinfo/monkey
