--- thread.c.orig       2010-04-03 11:07:16.000000000 +0400
+++ thread.c    2010-08-18 13:03:50.000000000 +0400
@@ -11,7 +11,8 @@
 #include <string.h>
 #include <pthread.h>

-#define ITEMS_PER_ALLOC 64
+#define CACHE_LINE_SIZE 64
+#define ITEMS_PER_ALLOC 256

 /* An item in the connection queue. */
 typedef struct conn_queue_item CQ_ITEM;
@@ -28,9 +29,13 @@
 typedef struct conn_queue CQ;
 struct conn_queue {
     CQ_ITEM *head;
+    char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+    CQ_ITEM *divider;
+    char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
     CQ_ITEM *tail;
+    char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
     pthread_mutex_t lock;
-    pthread_cond_t  cond;
+    char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
 };

 /* Lock for cache operations (item_*, assoc_*) */
@@ -44,7 +49,6 @@

 /* Free list of CQ_ITEM structs */
 static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;

 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

@@ -64,34 +68,39 @@

 static void thread_libevent_process(int fd, short which, void *arg);

+static CQ_ITEM *cqi_new(void);
+static void cqi_free(CQ_ITEM *item);
+
 /*
  * Initializes a connection queue.
  */
 static void cq_init(CQ *cq) {
     pthread_mutex_init(&cq->lock, NULL);
-    pthread_cond_init(&cq->cond, NULL);
-    cq->head = NULL;
-    cq->tail = NULL;
+    cq->head = cq->divider = cq->tail = cqi_new();
 }

 /*
  * Looks for an item on a connection queue, but doesn't block if
there isn't
  * one.
- * Returns the item, or NULL if no item is available
+ * Returns 1 if there are new item, or 0 if no item is available
  */
-static CQ_ITEM *cq_pop(CQ *cq) {
-    CQ_ITEM *item;
+static int cq_pop(CQ *cq, CQ_ITEM *item) {
+       int res = 0;
+
+    if (NULL == cq->divider->next)
+        return 0;

     pthread_mutex_lock(&cq->lock);
-    item = cq->head;
-    if (NULL != item) {
-        cq->head = item->next;
-        if (NULL == cq->head)
-            cq->tail = NULL;
+    if (NULL != cq->divider->next) {
+        *item = *cq->divider->next;
+        res = 1;
+        cq->divider = cq->divider->next;
     }
     pthread_mutex_unlock(&cq->lock);

-    return item;
+    item->next = NULL;
+
+    return res;
 }

 /*
@@ -101,13 +110,16 @@
     item->next = NULL;

     pthread_mutex_lock(&cq->lock);
-    if (NULL == cq->tail)
-        cq->head = item;
-    else
-        cq->tail->next = item;
-    cq->tail = item;
-    pthread_cond_signal(&cq->cond);
+    cq->tail->next = item;
+    cq->tail = cq->tail->next;
     pthread_mutex_unlock(&cq->lock);
+
+    while(cq->head != cq->divider)
+    {
+        CQ_ITEM *tmp = cq->head;
+        cq->head = cq->head->next;
+        cqi_free(tmp);
+    }
 }

 /*
@@ -115,12 +127,10 @@
  */
 static CQ_ITEM *cqi_new(void) {
     CQ_ITEM *item = NULL;
-    pthread_mutex_lock(&cqi_freelist_lock);
     if (cqi_freelist) {
         item = cqi_freelist;
         cqi_freelist = item->next;
     }
-    pthread_mutex_unlock(&cqi_freelist_lock);

     if (NULL == item) {
         int i;
@@ -138,12 +148,12 @@
         for (i = 2; i < ITEMS_PER_ALLOC; i++)
             item[i - 1].next = &item[i];

-        pthread_mutex_lock(&cqi_freelist_lock);
         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
         cqi_freelist = &item[1];
-        pthread_mutex_unlock(&cqi_freelist_lock);
     }

+    item->next = NULL;
+
     return item;
 }

@@ -152,10 +162,8 @@
  * Frees a connection queue item (adds it to the freelist.)
  */
 static void cqi_free(CQ_ITEM *item) {
-    pthread_mutex_lock(&cqi_freelist_lock);
     item->next = cqi_freelist;
     cqi_freelist = item;
-    pthread_mutex_unlock(&cqi_freelist_lock);
 }


@@ -253,16 +261,15 @@
  */
 static void thread_libevent_process(int fd, short which, void *arg) {
     LIBEVENT_THREAD *me = arg;
-    CQ_ITEM *item;
+    CQ_ITEM _item;
+    CQ_ITEM *item = &_item;
     char buf[1];

     if (read(fd, buf, 1) != 1)
         if (settings.verbose > 0)
             fprintf(stderr, "Can't read from libevent pipe\n");

-    item = cq_pop(me->new_conn_queue);
-
-    if (NULL != item) {
+    if (cq_pop(me->new_conn_queue, item)) {
         conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
                            item->read_buffer_size, item->transport,
me->base);
         if (c == NULL) {
@@ -279,7 +286,6 @@
         } else {
             c->thread = me;
         }
-        cqi_free(item);
     }
 }

@@ -594,7 +600,6 @@
     pthread_mutex_init(&init_lock, NULL);
     pthread_cond_init(&init_cond, NULL);

-    pthread_mutex_init(&cqi_freelist_lock, NULL);
     cqi_freelist = NULL;

     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

Reply via email to