Can you resubmit as a unified diff?

On Tue, 17 Aug 2010, ilnarb wrote:

> I suggest you to eliminate taking of the lock on cqi_freelist.
> In order to it we should done all work on cqi_freelist by one thread
> -- dispatcher thread in memcached.
> I made some changes in queue code, including improvements for cache
> line.
> It will work on any platform.
> There are patch file (diff -C3 memcached-1.4.5/thread.c <new
> thread.c>)
>
> *** thread.c.orig       Sat Apr  3 11:07:16 2010
> --- thread.c    Tue Aug 17 14:09:28 2010
> ***************
> *** 12,16 ****
>   #include <pthread.h>
>
> ! #define ITEMS_PER_ALLOC 64
>
>   /* An item in the connection queue. */
> --- 12,17 ----
>   #include <pthread.h>
>
> ! #define CACHE_LINE_SIZE 64
> ! #define ITEMS_PER_ALLOC 256
>
>   /* An item in the connection queue. */
> ***************
> *** 29,35 ****
>   struct conn_queue {
>       CQ_ITEM *head;
>       CQ_ITEM *tail;
>       pthread_mutex_t lock;
> !     pthread_cond_t  cond;
>   };
>
> --- 30,40 ----
>   struct conn_queue {
>       CQ_ITEM *head;
> +     char pad0[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
> +     CQ_ITEM *divider;
> +     char pad1[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
>       CQ_ITEM *tail;
> +     char pad2[CACHE_LINE_SIZE - sizeof(CQ_ITEM *)];
>       pthread_mutex_t lock;
> !     char pad3[CACHE_LINE_SIZE - sizeof(pthread_mutex_t)];
>   };
>
> ***************
> *** 45,49 ****
>   /* Free list of CQ_ITEM structs */
>   static CQ_ITEM *cqi_freelist;
> - static pthread_mutex_t cqi_freelist_lock;
>
>   static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
> --- 50,53 ----
> ***************
> *** 65,68 ****
> --- 69,75 ----
>   static void thread_libevent_process(int fd, short which, void *arg);
>
> + static CQ_ITEM *cqi_new(void);
> + static void cqi_free(CQ_ITEM *item);
> +
>   /*
>    * Initializes a connection queue.
> ***************
> *** 70,76 ****
>   static void cq_init(CQ *cq) {
>       pthread_mutex_init(&cq->lock, NULL);
> !     pthread_cond_init(&cq->cond, NULL);
> !     cq->head = NULL;
> !     cq->tail = NULL;
>   }
>
> --- 77,81 ----
>   static void cq_init(CQ *cq) {
>       pthread_mutex_init(&cq->lock, NULL);
> !     cq->head = cq->divider = cq->tail = cqi_new();
>   }
>
> ***************
> *** 78,96 ****
>    * Looks for an item on a connection queue, but doesn't block if
> there isn't
>    * one.
> !  * Returns the item, or NULL if no item is available
>    */
> ! static CQ_ITEM *cq_pop(CQ *cq) {
> !     CQ_ITEM *item;
>
>       pthread_mutex_lock(&cq->lock);
> !     item = cq->head;
> !     if (NULL != item) {
> !         cq->head = item->next;
> !         if (NULL == cq->head)
> !             cq->tail = NULL;
>       }
>       pthread_mutex_unlock(&cq->lock);
>
> !     return item;
>   }
>
> --- 83,105 ----
>    * Looks for an item on a connection queue, but doesn't block if
> there isn't
>    * one.
> !  * Returns 1 if there are new item, or 0 if no item is available
>    */
> ! static int cq_pop(CQ *cq, CQ_ITEM *item) {
> !       int res = 0;
> !
> !     if (NULL == cq->divider->next)
> !         return 0;
>
>       pthread_mutex_lock(&cq->lock);
> !     if (NULL != cq->divider->next) {
> !         *item = *cq->divider->next;
> !         res = 1;
> !         cq->divider = cq->divider->next;
>       }
>       pthread_mutex_unlock(&cq->lock);
>
> !     item->next = NULL;
> !
> !     return res;
>   }
>
> ***************
> *** 102,112 ****
>
>       pthread_mutex_lock(&cq->lock);
> !     if (NULL == cq->tail)
> !         cq->head = item;
> !     else
> !         cq->tail->next = item;
> !     cq->tail = item;
> !     pthread_cond_signal(&cq->cond);
>       pthread_mutex_unlock(&cq->lock);
>   }
>
> --- 111,124 ----
>
>       pthread_mutex_lock(&cq->lock);
> !     cq->tail->next = item;
> !     cq->tail = cq->tail->next;
>       pthread_mutex_unlock(&cq->lock);
> +
> +     while(cq->head != cq->divider)
> +     {
> +         CQ_ITEM *tmp = cq->head;
> +         cq->head = cq->head->next;
> +         cqi_free(tmp);
> +     }
>   }
>
> ***************
> *** 116,125 ****
>   static CQ_ITEM *cqi_new(void) {
>       CQ_ITEM *item = NULL;
> -     pthread_mutex_lock(&cqi_freelist_lock);
>       if (cqi_freelist) {
>           item = cqi_freelist;
>           cqi_freelist = item->next;
>       }
> -     pthread_mutex_unlock(&cqi_freelist_lock);
>
>       if (NULL == item) {
> --- 128,135 ----
> ***************
> *** 139,148 ****
>               item[i - 1].next = &item[i];
>
> -         pthread_mutex_lock(&cqi_freelist_lock);
>           item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
>           cqi_freelist = &item[1];
> -         pthread_mutex_unlock(&cqi_freelist_lock);
>       }
>
>       return item;
>   }
> --- 149,158 ----
>               item[i - 1].next = &item[i];
>
>           item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
>           cqi_freelist = &item[1];
>       }
>
> +     item->next = NULL;
> +
>       return item;
>   }
> ***************
> *** 153,160 ****
>    */
>   static void cqi_free(CQ_ITEM *item) {
> -     pthread_mutex_lock(&cqi_freelist_lock);
>       item->next = cqi_freelist;
>       cqi_freelist = item;
> -     pthread_mutex_unlock(&cqi_freelist_lock);
>   }
>
> --- 163,168 ----
> ***************
> *** 254,258 ****
>   static void thread_libevent_process(int fd, short which, void *arg)
> {
>       LIBEVENT_THREAD *me = arg;
> !     CQ_ITEM *item;
>       char buf[1];
>
> --- 262,267 ----
>   static void thread_libevent_process(int fd, short which, void *arg)
> {
>       LIBEVENT_THREAD *me = arg;
> !     CQ_ITEM _item;
> !     CQ_ITEM *item = &_item;
>       char buf[1];
>
> ***************
> *** 261,267 ****
>               fprintf(stderr, "Can't read from libevent pipe\n");
>
> !     item = cq_pop(me->new_conn_queue);
> !
> !     if (NULL != item) {
>           conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
>                              item->read_buffer_size, item->transport,
> me->base);
> --- 270,274 ----
>               fprintf(stderr, "Can't read from libevent pipe\n");
>
> !     if (cq_pop(me->new_conn_queue, item)) {
>           conn *c = conn_new(item->sfd, item->init_state, item-
> >event_flags,
>                              item->read_buffer_size, item->transport,
> me->base);
> ***************
> *** 280,284 ****
>               c->thread = me;
>           }
> -         cqi_free(item);
>       }
>   }
> --- 287,290 ----
> ***************
> *** 595,599 ****
>       pthread_cond_init(&init_cond, NULL);
>
> -     pthread_mutex_init(&cqi_freelist_lock, NULL);
>       cqi_freelist = NULL;
>
> --- 601,604 ----
>
>
>
> Also I tested lock- & wait- free single-producer single-consumer
> conn_queue using volatile variables. It works on my SMP server (2*Xeon
> E55**), centos 5.5, gcc 4.1. But I'm not sure that it will work on
> other platforms without memory fence instructions/calls, for example,
> on Itanium. I can send patch if anybody interested in it.
>

Reply via email to