Some little bit changes:
Using the fact that dispatcher keep balance between threads while
dispatch new connections, we can move out freelist into conn_queue
without any problem with unbalanced size of free items on each
conn_queue. Also I eliminate tailing "while .. free" code in cq_push
in my previous inplementation -- new items will get from head of queue
while there are free items (from head to divider).
This make able to make more than one dispatcher thread without
synchronization problems on unguarded global freelist.
There are unified diff of thread.c/1.4.5


--- thread.c.orig       2010-04-03 11:07:16.000000000 +0400
+++ thread.c    2010-08-18 13:28:18.000000000 +0400
@@ -11,6 +11,7 @@
 #include <string.h>
 #include <pthread.h>

+#define CACHE_LINE_SIZE 64
 #define ITEMS_PER_ALLOC 64

 /* An item in the connection queue. */
@@ -28,9 +29,15 @@
 typedef struct conn_queue CQ;
 struct conn_queue {
     CQ_ITEM *head;
+    char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+    CQ_ITEM *divider;
+    char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
     CQ_ITEM *tail;
+    char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
     pthread_mutex_t lock;
-    pthread_cond_t  cond;
+    char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
+    CQ_ITEM *freelist;
+    char pad4[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
 };

 /* Lock for cache operations (item_*, assoc_*) */
@@ -42,10 +49,6 @@
 /* Lock for global stats */
 static pthread_mutex_t stats_lock;

-/* Free list of CQ_ITEM structs */
-static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;
-
 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

 /*
@@ -65,62 +68,18 @@
 static void thread_libevent_process(int fd, short which, void *arg);

 /*
- * Initializes a connection queue.
- */
-static void cq_init(CQ *cq) {
-    pthread_mutex_init(&cq->lock, NULL);
-    pthread_cond_init(&cq->cond, NULL);
-    cq->head = NULL;
-    cq->tail = NULL;
-}
-
-/*
- * Looks for an item on a connection queue, but doesn't block if
there isn't
- * one.
- * Returns the item, or NULL if no item is available
- */
-static CQ_ITEM *cq_pop(CQ *cq) {
-    CQ_ITEM *item;
-
-    pthread_mutex_lock(&cq->lock);
-    item = cq->head;
-    if (NULL != item) {
-        cq->head = item->next;
-        if (NULL == cq->head)
-            cq->tail = NULL;
-    }
-    pthread_mutex_unlock(&cq->lock);
-
-    return item;
-}
-
-/*
- * Adds an item to a connection queue.
- */
-static void cq_push(CQ *cq, CQ_ITEM *item) {
-    item->next = NULL;
-
-    pthread_mutex_lock(&cq->lock);
-    if (NULL == cq->tail)
-        cq->head = item;
-    else
-        cq->tail->next = item;
-    cq->tail = item;
-    pthread_cond_signal(&cq->cond);
-    pthread_mutex_unlock(&cq->lock);
-}
-
-/*
  * Returns a fresh connection queue item.
  */
-static CQ_ITEM *cqi_new(void) {
+static CQ_ITEM *cq_new_item(CQ *cq) {
     CQ_ITEM *item = NULL;
-    pthread_mutex_lock(&cqi_freelist_lock);
-    if (cqi_freelist) {
-        item = cqi_freelist;
-        cqi_freelist = item->next;
+    if (cq->head && cq->head != cq->divider) {
+        item = cq->head;
+        cq->head = cq->head->next;
+    }
+    else if (cq->freelist) {
+        item = cq->freelist;
+        cq->freelist = item->next;
     }
-    pthread_mutex_unlock(&cqi_freelist_lock);

     if (NULL == item) {
         int i;
@@ -138,26 +97,60 @@
         for (i = 2; i < ITEMS_PER_ALLOC; i++)
             item[i - 1].next = &item[i];

-        pthread_mutex_lock(&cqi_freelist_lock);
-        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
-        cqi_freelist = &item[1];
-        pthread_mutex_unlock(&cqi_freelist_lock);
+        item[ITEMS_PER_ALLOC - 1].next = cq->freelist;
+        cq->freelist = &item[1];
     }

+    item->next = NULL;
+
     return item;
 }

+/*
+ * Initializes a connection queue.
+ */
+static void cq_init(CQ *cq) {
+    pthread_mutex_init(&cq->lock, NULL);
+    cq->freelist = NULL;
+    cq->head = NULL;
+    cq->head = cq->divider = cq->tail = cq_new_item(cq);
+}

 /*
- * Frees a connection queue item (adds it to the freelist.)
+ * Looks for an item on a connection queue, but doesn't block if
there isn't
+ * one.
+ * Returns 1 if there are new item, or 0 if no item is available
  */
-static void cqi_free(CQ_ITEM *item) {
-    pthread_mutex_lock(&cqi_freelist_lock);
-    item->next = cqi_freelist;
-    cqi_freelist = item;
-    pthread_mutex_unlock(&cqi_freelist_lock);
+static int cq_pop(CQ *cq, CQ_ITEM *item) {
+    int res = 0;
+
+    if (NULL == cq->divider->next)
+        return 0;
+
+    pthread_mutex_lock(&cq->lock);
+    if (NULL != cq->divider->next) {
+        *item = *cq->divider->next;
+        res = 1;
+        cq->divider = cq->divider->next;
+    }
+    pthread_mutex_unlock(&cq->lock);
+
+    item->next = NULL;
+
+    return res;
 }

+/*
+ * Adds an item to a connection queue.
+ */
+static void cq_push(CQ *cq, CQ_ITEM *item) {
+    item->next = NULL;
+
+    pthread_mutex_lock(&cq->lock);
+    cq->tail->next = item;
+    cq->tail = cq->tail->next;
+    pthread_mutex_unlock(&cq->lock);
+}

 /*
  * Creates a worker thread.
@@ -253,16 +246,15 @@
  */
 static void thread_libevent_process(int fd, short which, void *arg) {
     LIBEVENT_THREAD *me = arg;
-    CQ_ITEM *item;
+    CQ_ITEM _item;
+    CQ_ITEM *item = &_item;
     char buf[1];

     if (read(fd, buf, 1) != 1)
         if (settings.verbose > 0)
             fprintf(stderr, "Can't read from libevent pipe\n");

-    item = cq_pop(me->new_conn_queue);
-
-    if (NULL != item) {
+    if (cq_pop(me->new_conn_queue, item)) {
         conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
                            item->read_buffer_size, item->transport,
me->base);
         if (c == NULL) {
@@ -279,7 +271,6 @@
         } else {
             c->thread = me;
         }
-        cqi_free(item);
     }
 }

@@ -293,13 +284,15 @@
  */
 void dispatch_conn_new(int sfd, enum conn_states init_state, int
event_flags,
                        int read_buffer_size, enum network_transport
transport) {
-    CQ_ITEM *item = cqi_new();
+    CQ_ITEM *item = NULL;
     int tid = (last_thread + 1) % settings.num_threads;

     LIBEVENT_THREAD *thread = threads + tid;

     last_thread = tid;

+    item = cq_new_item(thread->new_conn_queue);
+
     item->sfd = sfd;
     item->init_state = init_state;
     item->event_flags = event_flags;
@@ -594,9 +587,6 @@
     pthread_mutex_init(&init_lock, NULL);
     pthread_cond_init(&init_cond, NULL);

-    pthread_mutex_init(&cqi_freelist_lock, NULL);
-    cqi_freelist = NULL;
-
     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
     if (! threads) {
         perror("Can't allocate thread descriptors");

Reply via email to