Can you resubmit as a unified diff?
On Tue, 17 Aug 2010, ilnarb wrote:
> I suggest you to eliminate taking of the lock on cqi_freelist.
> In order to it we should done all work on cqi_freelist by one thread
> -- dispatcher thread in memcached.
> I made some changes in queue code, including improvements for cache
> line.
> It will work on any platform.
> There are patch file (diff -C3 memcached-1.4.5/thread.c <new
> thread.c>)
>
> *** thread.c.orig Sat Apr 3 11:07:16 2010
> --- thread.c Tue Aug 17 14:09:28 2010
> ***************
> *** 12,16 ****
> #include <pthread.h>
>
> ! #define ITEMS_PER_ALLOC 64
>
> /* An item in the connection queue. */
> --- 12,17 ----
> #include <pthread.h>
>
> ! #define CACHE_LINE_SIZE 64
> ! #define ITEMS_PER_ALLOC 256
>
> /* An item in the connection queue. */
> ***************
> *** 29,35 ****
> struct conn_queue {
> CQ_ITEM *head;
> CQ_ITEM *tail;
> pthread_mutex_t lock;
> ! pthread_cond_t cond;
> };
>
> --- 30,40 ----
> struct conn_queue {
> CQ_ITEM *head;
> + char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> + CQ_ITEM *divider;
> + char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> CQ_ITEM *tail;
> + char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> pthread_mutex_t lock;
> ! char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
> };
>
> ***************
> *** 45,49 ****
> /* Free list of CQ_ITEM structs */
> static CQ_ITEM *cqi_freelist;
> - static pthread_mutex_t cqi_freelist_lock;
>
> static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
> --- 50,53 ----
> ***************
> *** 65,68 ****
> --- 69,75 ----
> static void thread_libevent_process(int fd, short which, void *arg);
>
> + static CQ_ITEM *cqi_new(void);
> + static void cqi_free(CQ_ITEM *item);
> +
> /*
> * Initializes a connection queue.
> ***************
> *** 70,76 ****
> static void cq_init(CQ *cq) {
> pthread_mutex_init(&cq->lock, NULL);
> ! pthread_cond_init(&cq->cond, NULL);
> ! cq->head = NULL;
> ! cq->tail = NULL;
> }
>
> --- 77,81 ----
> static void cq_init(CQ *cq) {
> pthread_mutex_init(&cq->lock, NULL);
> ! cq->head = cq->divider = cq->tail = cqi_new();
> }
>
> ***************
> *** 78,96 ****
> * Looks for an item on a connection queue, but doesn't block if
> there isn't
> * one.
> ! * Returns the item, or NULL if no item is available
> */
> ! static CQ_ITEM *cq_pop(CQ *cq) {
> ! CQ_ITEM *item;
>
> pthread_mutex_lock(&cq->lock);
> ! item = cq->head;
> ! if (NULL != item) {
> ! cq->head = item->next;
> ! if (NULL == cq->head)
> ! cq->tail = NULL;
> }
> pthread_mutex_unlock(&cq->lock);
>
> ! return item;
> }
>
> --- 83,105 ----
> * Looks for an item on a connection queue, but doesn't block if
> there isn't
> * one.
> ! * Returns 1 if there are new item, or 0 if no item is available
> */
> ! static int cq_pop(CQ *cq, CQ_ITEM *item) {
> ! int res = 0;
> !
> ! if (NULL == cq->divider->next)
> ! return 0;
>
> pthread_mutex_lock(&cq->lock);
> ! if (NULL != cq->divider->next) {
> ! *item = *cq->divider->next;
> ! res = 1;
> ! cq->divider = cq->divider->next;
> }
> pthread_mutex_unlock(&cq->lock);
>
> ! item->next = NULL;
> !
> ! return res;
> }
>
> ***************
> *** 102,112 ****
>
> pthread_mutex_lock(&cq->lock);
> ! if (NULL == cq->tail)
> ! cq->head = item;
> ! else
> ! cq->tail->next = item;
> ! cq->tail = item;
> ! pthread_cond_signal(&cq->cond);
> pthread_mutex_unlock(&cq->lock);
> }
>
> --- 111,124 ----
>
> pthread_mutex_lock(&cq->lock);
> ! cq->tail->next = item;
> ! cq->tail = cq->tail->next;
> pthread_mutex_unlock(&cq->lock);
> +
> + while(cq->head != cq->divider)
> + {
> + CQ_ITEM *tmp = cq->head;
> + cq->head = cq->head->next;
> + cqi_free(tmp);
> + }
> }
>
> ***************
> *** 116,125 ****
> static CQ_ITEM *cqi_new(void) {
> CQ_ITEM *item = NULL;
> - pthread_mutex_lock(&cqi_freelist_lock);
> if (cqi_freelist) {
> item = cqi_freelist;
> cqi_freelist = item->next;
> }
> - pthread_mutex_unlock(&cqi_freelist_lock);
>
> if (NULL == item) {
> --- 128,135 ----
> ***************
> *** 139,148 ****
> item[i - 1].next = &item[i];
>
> - pthread_mutex_lock(&cqi_freelist_lock);
> item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
> cqi_freelist = &item[1];
> - pthread_mutex_unlock(&cqi_freelist_lock);
> }
>
> return item;
> }
> --- 149,158 ----
> item[i - 1].next = &item[i];
>
> item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
> cqi_freelist = &item[1];
> }
>
> + item->next = NULL;
> +
> return item;
> }
> ***************
> *** 153,160 ****
> */
> static void cqi_free(CQ_ITEM *item) {
> - pthread_mutex_lock(&cqi_freelist_lock);
> item->next = cqi_freelist;
> cqi_freelist = item;
> - pthread_mutex_unlock(&cqi_freelist_lock);
> }
>
> --- 163,168 ----
> ***************
> *** 254,258 ****
> static void thread_libevent_process(int fd, short which, void *arg)
> {
> LIBEVENT_THREAD *me = arg;
> ! CQ_ITEM *item;
> char buf[1];
>
> --- 262,267 ----
> static void thread_libevent_process(int fd, short which, void *arg)
> {
> LIBEVENT_THREAD *me = arg;
> ! CQ_ITEM _item;
> ! CQ_ITEM *item = &_item;
> char buf[1];
>
> ***************
> *** 261,267 ****
> fprintf(stderr, "Can't read from libevent pipe\n");
>
> ! item = cq_pop(me->new_conn_queue);
> !
> ! if (NULL != item) {
> conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
> item->read_buffer_size, item->transport,
> me->base);
> --- 270,274 ----
> fprintf(stderr, "Can't read from libevent pipe\n");
>
> ! if (cq_pop(me->new_conn_queue, item)) {
> conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
> item->read_buffer_size, item->transport,
> me->base);
> ***************
> *** 280,284 ****
> c->thread = me;
> }
> - cqi_free(item);
> }
> }
> --- 287,290 ----
> ***************
> *** 595,599 ****
> pthread_cond_init(&init_cond, NULL);
>
> - pthread_mutex_init(&cqi_freelist_lock, NULL);
> cqi_freelist = NULL;
>
> --- 601,604 ----
>
>
>
> Also I tested lock- & wait- free single-producer single-consumer
> conn_queue using volatile variables. It works on my SMP server (2*Xeon
> E55**), centos 5.5, gcc 4.1. But I'm not sure that it will work on
> other platforms without memory fence instructions/calls, for example,
> on Itanium. I can send patch if anybody interested in it.
>