--- thread.c.orig 2010-04-03 11:07:16.000000000 +0400
+++ thread.c 2010-08-18 13:03:50.000000000 +0400
@@ -11,7 +11,8 @@
#include <string.h>
#include <pthread.h>
-#define ITEMS_PER_ALLOC 64
+#define CACHE_LINE_SIZE 64
+#define ITEMS_PER_ALLOC 256
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
@@ -28,9 +29,13 @@
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
+ char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+ CQ_ITEM *divider;
+ char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
CQ_ITEM *tail;
+ char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
pthread_mutex_t lock;
- pthread_cond_t cond;
+ char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
};
/* Lock for cache operations (item_*, assoc_*) */
@@ -44,7 +49,6 @@
/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
@@ -64,34 +68,39 @@
static void thread_libevent_process(int fd, short which, void *arg);
+static CQ_ITEM *cqi_new(void);
+static void cqi_free(CQ_ITEM *item);
+
/*
* Initializes a connection queue.
*/
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
- pthread_cond_init(&cq->cond, NULL);
- cq->head = NULL;
- cq->tail = NULL;
+ cq->head = cq->divider = cq->tail = cqi_new();
}
/*
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
- * Returns the item, or NULL if no item is available
+ * Returns 1 if there are new item, or 0 if no item is available
*/
-static CQ_ITEM *cq_pop(CQ *cq) {
- CQ_ITEM *item;
+static int cq_pop(CQ *cq, CQ_ITEM *item) {
+ int res = 0;
+
+ if (NULL == cq->divider->next)
+ return 0;
pthread_mutex_lock(&cq->lock);
- item = cq->head;
- if (NULL != item) {
- cq->head = item->next;
- if (NULL == cq->head)
- cq->tail = NULL;
+ if (NULL != cq->divider->next) {
+ *item = *cq->divider->next;
+ res = 1;
+ cq->divider = cq->divider->next;
}
pthread_mutex_unlock(&cq->lock);
- return item;
+ item->next = NULL;
+
+ return res;
}
/*
@@ -101,13 +110,16 @@
item->next = NULL;
pthread_mutex_lock(&cq->lock);
- if (NULL == cq->tail)
- cq->head = item;
- else
- cq->tail->next = item;
- cq->tail = item;
- pthread_cond_signal(&cq->cond);
+ cq->tail->next = item;
+ cq->tail = cq->tail->next;
pthread_mutex_unlock(&cq->lock);
+
+ while(cq->head != cq->divider)
+ {
+ CQ_ITEM *tmp = cq->head;
+ cq->head = cq->head->next;
+ cqi_free(tmp);
+ }
}
/*
@@ -115,12 +127,10 @@
*/
static CQ_ITEM *cqi_new(void) {
CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
if (cqi_freelist) {
item = cqi_freelist;
cqi_freelist = item->next;
}
- pthread_mutex_unlock(&cqi_freelist_lock);
if (NULL == item) {
int i;
@@ -138,12 +148,12 @@
for (i = 2; i < ITEMS_PER_ALLOC; i++)
item[i - 1].next = &item[i];
- pthread_mutex_lock(&cqi_freelist_lock);
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
}
+ item->next = NULL;
+
return item;
}
@@ -152,10 +162,8 @@
* Frees a connection queue item (adds it to the freelist.)
*/
static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
item->next = cqi_freelist;
cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
}
@@ -253,16 +261,15 @@
*/
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
- CQ_ITEM *item;
+ CQ_ITEM _item;
+ CQ_ITEM *item = &_item;
char buf[1];
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
- item = cq_pop(me->new_conn_queue);
-
- if (NULL != item) {
+ if (cq_pop(me->new_conn_queue, item)) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
@@ -279,7 +286,6 @@
} else {
c->thread = me;
}
- cqi_free(item);
}
}
@@ -594,7 +600,6 @@
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
- pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));