I suggest you to eliminate taking of the lock on cqi_freelist.
In order to it we should done all work on cqi_freelist by one thread
-- dispatcher thread in memcached.
I made some changes in queue code, including improvements for cache
line.
It will work on any platform.
There are patch file (diff -C3 memcached-1.4.5/thread.c <new
thread.c>)
*** thread.c.orig Sat Apr 3 11:07:16 2010
--- thread.c Tue Aug 17 14:09:28 2010
***************
*** 12,16 ****
#include <pthread.h>
! #define ITEMS_PER_ALLOC 64
/* An item in the connection queue. */
--- 12,17 ----
#include <pthread.h>
! #define CACHE_LINE_SIZE 64
! #define ITEMS_PER_ALLOC 256
/* An item in the connection queue. */
***************
*** 29,35 ****
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
! pthread_cond_t cond;
};
--- 30,40 ----
struct conn_queue {
CQ_ITEM *head;
+ char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
+ CQ_ITEM *divider;
+ char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
CQ_ITEM *tail;
+ char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
pthread_mutex_t lock;
! char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
};
***************
*** 45,49 ****
/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
- static pthread_mutex_t cqi_freelist_lock;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
--- 50,53 ----
***************
*** 65,68 ****
--- 69,75 ----
static void thread_libevent_process(int fd, short which, void *arg);
+ static CQ_ITEM *cqi_new(void);
+ static void cqi_free(CQ_ITEM *item);
+
/*
* Initializes a connection queue.
***************
*** 70,76 ****
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
! pthread_cond_init(&cq->cond, NULL);
! cq->head = NULL;
! cq->tail = NULL;
}
--- 77,81 ----
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
! cq->head = cq->divider = cq->tail = cqi_new();
}
***************
*** 78,96 ****
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
! * Returns the item, or NULL if no item is available
*/
! static CQ_ITEM *cq_pop(CQ *cq) {
! CQ_ITEM *item;
pthread_mutex_lock(&cq->lock);
! item = cq->head;
! if (NULL != item) {
! cq->head = item->next;
! if (NULL == cq->head)
! cq->tail = NULL;
}
pthread_mutex_unlock(&cq->lock);
! return item;
}
--- 83,105 ----
* Looks for an item on a connection queue, but doesn't block if
there isn't
* one.
! * Returns 1 if there are new item, or 0 if no item is available
*/
! static int cq_pop(CQ *cq, CQ_ITEM *item) {
! int res = 0;
!
! if (NULL == cq->divider->next)
! return 0;
pthread_mutex_lock(&cq->lock);
! if (NULL != cq->divider->next) {
! *item = *cq->divider->next;
! res = 1;
! cq->divider = cq->divider->next;
}
pthread_mutex_unlock(&cq->lock);
! item->next = NULL;
!
! return res;
}
***************
*** 102,112 ****
pthread_mutex_lock(&cq->lock);
! if (NULL == cq->tail)
! cq->head = item;
! else
! cq->tail->next = item;
! cq->tail = item;
! pthread_cond_signal(&cq->cond);
pthread_mutex_unlock(&cq->lock);
}
--- 111,124 ----
pthread_mutex_lock(&cq->lock);
! cq->tail->next = item;
! cq->tail = cq->tail->next;
pthread_mutex_unlock(&cq->lock);
+
+ while(cq->head != cq->divider)
+ {
+ CQ_ITEM *tmp = cq->head;
+ cq->head = cq->head->next;
+ cqi_free(tmp);
+ }
}
***************
*** 116,125 ****
static CQ_ITEM *cqi_new(void) {
CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
if (cqi_freelist) {
item = cqi_freelist;
cqi_freelist = item->next;
}
- pthread_mutex_unlock(&cqi_freelist_lock);
if (NULL == item) {
--- 128,135 ----
***************
*** 139,148 ****
item[i - 1].next = &item[i];
- pthread_mutex_lock(&cqi_freelist_lock);
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
}
return item;
}
--- 149,158 ----
item[i - 1].next = &item[i];
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
cqi_freelist = &item[1];
}
+ item->next = NULL;
+
return item;
}
***************
*** 153,160 ****
*/
static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
item->next = cqi_freelist;
cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
}
--- 163,168 ----
***************
*** 254,258 ****
static void thread_libevent_process(int fd, short which, void *arg)
{
LIBEVENT_THREAD *me = arg;
! CQ_ITEM *item;
char buf[1];
--- 262,267 ----
static void thread_libevent_process(int fd, short which, void *arg)
{
LIBEVENT_THREAD *me = arg;
! CQ_ITEM _item;
! CQ_ITEM *item = &_item;
char buf[1];
***************
*** 261,267 ****
fprintf(stderr, "Can't read from libevent pipe\n");
! item = cq_pop(me->new_conn_queue);
!
! if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
--- 270,274 ----
fprintf(stderr, "Can't read from libevent pipe\n");
! if (cq_pop(me->new_conn_queue, item)) {
conn *c = conn_new(item->sfd, item->init_state, item-
>event_flags,
item->read_buffer_size, item->transport,
me->base);
***************
*** 280,284 ****
c->thread = me;
}
- cqi_free(item);
}
}
--- 287,290 ----
***************
*** 595,599 ****
pthread_cond_init(&init_cond, NULL);
- pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
--- 601,604 ----
Also I tested lock- & wait- free single-producer single-consumer
conn_queue using volatile variables. It works on my SMP server (2*Xeon
E55**), centos 5.5, gcc 4.1. But I'm not sure that it will work on
other platforms without memory fence instructions/calls, for example,
on Itanium. I can send patch if anybody interested in it.