It seems that a locking command is something that has long been requested, but never needed enough to actually be implemented. At GameLogic, I've created a set of mutually-exclusive, advisory locking commands. It works like this:

'lock key-name' - locks the item, and performs the equivalent of a 'gets' on the key. 'unlock key-name' - drop the lock. If any other clients are waiting for the lock, the key's value is transmitted to the next client (FIFO). 'cas key-name' - updates and unlocks the item (checking the cas). If any other clients are waiting for the lock, the key's new value is transmitted to the next client (FIFO).

A given connection may only lock one key at a time (which keeps things simple and prevents deadlocks). The 2nd connection to request a lock will block until the item is unlocked. The locks are advisory, if another client 'get' or 'gets' the key, they will be allowed to proceed normally. However, in order to unlock the item, you must perform a 'cas' command.

The implementation is currently missing:
'trylock' - command that attempts to lock, but will fail immediately if the item is already locked.

We've been using this patch in production for about a month now at GameLogic. I'm fairly confident that the functionality that is implemented is reasonably bulletproof, at least in our limited usage of memcached.

- Peter Kovacs
diff -Naur memcached-1.2.6/ChangeLog memcached-1.2.6p/ChangeLog
--- memcached-1.2.6/ChangeLog   2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/ChangeLog  2008-10-02 10:30:58.000000000 -0400
@@ -1,3 +1,8 @@
+2008-10-02
+
+       * Add support for mutually-exclusive advisory 'lock' command
+         (GameLogic.com)
+
 2008-07-24 [Version 1.2.6-rc1 released]
 
        * Add support for newer automake (Facebook)
diff -Naur memcached-1.2.6/assoc.h memcached-1.2.6p/assoc.h
--- memcached-1.2.6/assoc.h     2008-04-26 17:58:33.000000000 -0400
+++ memcached-1.2.6p/assoc.h    2008-10-02 10:19:37.000000000 -0400
@@ -1,3 +1,6 @@
+#ifndef __MEMCACHE_ASSOC_H__
+#define __MEMCACHE_ASSOC_H__
+
 /* associative array */
 void assoc_init(void);
 item *assoc_find(const char *key, const size_t nkey);
@@ -5,3 +8,5 @@
 void assoc_delete(const char *key, const size_t nkey);
 void do_assoc_move_next_bucket(void);
 uint32_t hash( const void *key, size_t length, const uint32_t initval);
+
+#endif
diff -Naur memcached-1.2.6/items.c memcached-1.2.6p/items.c
--- memcached-1.2.6/items.c     2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/items.c    2008-10-02 10:19:37.000000000 -0400
@@ -1,6 +1,8 @@
 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* vim:set sts=4 sw=4 */
 /* $Id$ */
 #include "memcached.h"
+#include "items.h"
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <sys/signal.h>
@@ -53,18 +55,6 @@
     return ++cas_id;
 }
 
-/* Enable this for reference-count debugging. */
-#if 0
-# define DEBUG_REFCNT(it,op) \
-                fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
-                        it, op, it->refcount, \
-                        (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
-                        (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
-                        (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
-#else
-# define DEBUG_REFCNT(it,op) while(0)
-#endif
-
 /**
  * Generates the variable-sized part of the header for an object.
  *
@@ -156,6 +146,7 @@
     it->exptime = exptime;
     memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
     it->nsuffix = nsuffix;
+    it->lock = 0;
     return it;
 }
 
@@ -291,9 +282,17 @@
 }
 
 int do_item_replace(item *it, item *new_it) {
+    conn *c;
+
     MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nbytes,
-                           ITEM_key(new_it), new_it->nbytes);
+                          ITEM_key(new_it), new_it->nbytes);
     assert((it->it_flags & ITEM_SLABBED) == 0);
+    assert( new_it->lock == NULL );
+
+    new_it->lock = it->lock;
+    for( c = new_it->lock; c; c = c->lock.next )
+       c->lock.ilock = new_it;
+    it->lock = NULL;
 
     do_item_unlink(it);
     return do_item_link(new_it);
@@ -421,7 +420,10 @@
 item *do_item_get_notedeleted(const char *key, const size_t nkey, bool 
*delete_locked) {
     item *it = assoc_find(key, nkey);
     if (delete_locked) *delete_locked = false;
-    if (it != NULL && (it->it_flags & ITEM_DELETED)) {
+
+    /* don't allow locked items to be deleted. */
+
+    if (it != NULL && it->lock == NULL && (it->it_flags & ITEM_DELETED)) {
         /* it's flagged as delete-locked.  let's see if that condition
            is past due, and the 5-second delete_timer just hasn't
            gotten to it yet... */
@@ -430,12 +432,14 @@
             it = NULL;
         }
     }
-    if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= 
current_time &&
+    if (it != NULL && it->lock == NULL && 
+       settings.oldest_live != 0 && settings.oldest_live <= current_time &&
         it->time <= settings.oldest_live) {
         do_item_unlink(it);           /* MTSAFE - cache_lock held */
         it = NULL;
     }
-    if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
+    if (it != NULL && it->lock == NULL && 
+       it->exptime != 0 && it->exptime <= current_time) {
         do_item_unlink(it);           /* MTSAFE - cache_lock held */
         it = NULL;
     }
@@ -486,3 +490,4 @@
         }
     }
 }
+
diff -Naur memcached-1.2.6/items.h memcached-1.2.6p/items.h
--- memcached-1.2.6/items.h     2008-04-27 20:37:56.000000000 -0400
+++ memcached-1.2.6p/items.h    2008-10-02 10:19:37.000000000 -0400
@@ -1,4 +1,39 @@
+#ifndef __MEMCACHE_ITEM_H__
+#define __MEMCACHE_ITEM_H__
+
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+                fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
+                        it, op, it->refcount, \
+                        (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
+                        (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
+                        (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
+
+#if 0
+# define DEBUG_LOCK( it ) \
+    do \
+    { \
+        conn *xxx_; \
+        fprintf( stderr, "ITEM %p LOCK ", it ); \
+        for( xxx_ = it->lock; xxx_; xxx_ = xxx_->lock.next ) \
+        { \
+            fprintf( stderr, "->%p", xxx_ ); \
+            if( xxx_->lock.next ) \
+            { assert( xxx_->lock.next->lock.prev == xxx_ ); } \
+        } \
+        fprintf( stderr, "\n" ); \
+    } while( 0 ) 
+#else
+# define DEBUG_LOCK( it ) while(0)
+#endif
+
+
 /* See items.c */
+/** Initialize item table. */
 void item_init(void);
 /[EMAIL PROTECTED]@*/
 item *do_item_alloc(char *key, const size_t nkey, const int flags, const 
rel_time_t exptime, const int nbytes);
@@ -22,3 +57,5 @@
 
 item *do_item_get_notedeleted(const char *key, const size_t nkey, bool 
*delete_locked);
 item *do_item_get_nocheck(const char *key, const size_t nkey);
+
+#endif
diff -Naur memcached-1.2.6/memcached.c memcached-1.2.6p/memcached.c
--- memcached-1.2.6/memcached.c 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/memcached.c        2008-10-02 10:19:37.000000000 -0400
@@ -1,4 +1,5 @@
 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* vim:set sts=4 sw=4 */
 /*
  *  memcached - memory caching daemon
  *
@@ -370,6 +371,11 @@
 
     c->noreply = false;
 
+    /* Initially, we're not locking anything. */
+    c->lock.ilock = 0;
+    c->lock.next = 0;
+    c->lock.prev = 0;
+
     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
     event_base_set(base, &c->event);
     c->ev_flags = event_flags;
@@ -392,31 +398,60 @@
     return c;
 }
 
+static void unlink_lock( conn *c )
+{
+    if( c->lock.ilock )
+    {
+       // We're the actual connection holding the lock, therefore unlock.
+       if( c == c->lock.ilock->lock ) {
+           item_unlock( c->lock.ilock, c );
+       } else {
+           // If we don't have the actual lock, then we're always going to
+           // have a prev.
+           DEBUG_LOCK( c->lock.ilock );
+
+           assert( c->lock.prev != NULL );
+
+           c->lock.prev->lock.next = c->lock.next;
+           if( c->lock.next ) {
+               c->lock.next->lock.prev = c->lock.prev;
+           }
+
+           DEBUG_LOCK( c->lock.ilock );
+       }
+
+       c->lock.ilock = NULL; 
+       c->lock.prev = c->lock.next = NULL;
+    }
+}
+
 static void conn_cleanup(conn *c) {
     assert(c != NULL);
 
+    unlink_lock( c );
+
     if (c->item) {
-        item_remove(c->item);
-        c->item = 0;
+       item_remove(c->item);
+       c->item = 0;
     }
 
     if (c->ileft != 0) {
-        for (; c->ileft > 0; c->ileft--,c->icurr++) {
-            item_remove(*(c->icurr));
-        }
+       for (; c->ileft > 0; c->ileft--,c->icurr++) {
+           item_remove(*(c->icurr));
+       }
     }
 
     if (c->suffixleft != 0) {
-        for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
-            if(suffix_add_to_freelist(*(c->suffixcurr))) {
-                free(*(c->suffixcurr));
-            }
-        }
+       for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
+           if(suffix_add_to_freelist(*(c->suffixcurr))) {
+               free(*(c->suffixcurr));
+           }
+       }
     }
 
     if (c->write_and_free) {
-        free(c->write_and_free);
-        c->write_and_free = 0;
+       free(c->write_and_free);
+       c->write_and_free = 0;
     }
 }
 
@@ -541,7 +576,7 @@
         if (state == conn_read) {
             conn_shrink(c);
             assoc_move_next_bucket();
-        }
+       }
         c->state = state;
 
         if (state == conn_write) {
@@ -777,7 +812,6 @@
     assert(c != NULL);
 
     item *it = c->item;
-    int comm = c->item_comm;
     int ret;
 
     STATS_LOCK();
@@ -787,7 +821,7 @@
     if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
         out_string(c, "CLIENT_ERROR bad data chunk");
     } else {
-      ret = store_item(it, comm);
+      ret = store_item(it, c);
       if (ret == 1) {
           out_string(c, "STORED");
 #ifdef HAVE_DTRACE
@@ -817,6 +851,8 @@
           out_string(c, "EXISTS");
       else if(ret == 3)
           out_string(c, "NOT_FOUND");
+      else if(ret == 4)
+          out_string(c, "ERROR cas only for locked items");
       else
           out_string(c, "NOT_STORED");
     }
@@ -831,7 +867,8 @@
  *
  * Returns true if the item was stored.
  */
-int do_store_item(item *it, int comm) {
+int do_store_item(item *it, conn *c) {
+    int command = c->item_comm;
     char *key = ITEM_key(it);
     bool delete_locked = false;
     item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
@@ -840,18 +877,22 @@
     item *new_it = NULL;
     int flags;
 
-    if (old_it != NULL && comm == NREAD_ADD) {
+    if (c->lock.ilock != old_it && 
+       command == NREAD_CAS )
+    { stored = 4; }
+
+    if (old_it != NULL && command == NREAD_ADD) {
         /* add only adds a nonexistent item, but promote to head of LRU */
         do_item_update(old_it);
-    } else if (!old_it && (comm == NREAD_REPLACE
-        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
+    } else if (!old_it && (command == NREAD_REPLACE
+        || command == NREAD_APPEND || command == NREAD_PREPEND))
     {
         /* replace only replaces an existing value; don't store */
-    } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
-        || comm == NREAD_APPEND || comm == NREAD_PREPEND))
+    } else if (delete_locked && (command == NREAD_REPLACE || command == 
NREAD_ADD
+        || command == NREAD_APPEND || command == NREAD_PREPEND))
     {
         /* replace and add can't override delete locks; don't store */
-    } else if (comm == NREAD_CAS) {
+    } else if (command == NREAD_CAS) {
         /* validate cas operation */
         if (delete_locked)
             old_it = do_item_get_nocheck(key, it->nkey);
@@ -861,8 +902,15 @@
           stored = 3;
         }
         else if(it->cas_id == old_it->cas_id) {
+           if (settings.verbose > 1)
+               fprintf(stderr, ">%d cas validates, lock %p\n", c->sfd, 
it->lock);
+
           // cas validates
           do_item_replace(old_it, it);
+
+         if( it->lock )
+             item_unlock( it, c );
+
           stored = 1;
         }
         else
@@ -875,7 +923,7 @@
          * atomic and thread-safe.
          */
 
-        if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
+        if (command == NREAD_APPEND || command == NREAD_PREPEND) {
 
             /* we have it and old_it here - alloc memory to hold both */
             /* flags was already lost - so recover them from ITEM_suffix(it) */
@@ -894,7 +942,7 @@
 
             /* copy data from it and old_it to new_it */
 
-            if (comm == NREAD_APPEND) {
+            if (command == NREAD_APPEND) {
                 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
                 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, 
ITEM_data(it), it->nbytes);
             } else {
@@ -929,6 +977,178 @@
     return stored;
 }
 
+static inline int send_item_to_conn( item *it, conn *c, int num, bool 
return_cas ) {
+    if (num >= c->isize) {
+       item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
+       if (new_list) {
+           c->isize *= 2;
+           c->ilist = new_list;
+       } else {
+         item_remove( it );
+         return 0;
+       }
+    }
+
+    /*
+     * Construct the response. Each hit adds three elements to the
+     * outgoing data list:
+     *   "VALUE "
+     *   key
+     *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
+     */
+
+    if(return_cas == true)
+    {
+       MEMCACHED_COMMAND_GETS(c->sfd, ITEM_key(it), it->nbytes,
+                              it->cas_id);
+
+       /* Goofy mid-flight realloc. */
+       if (num >= c->suffixsize) {
+           char **new_suffix_list = realloc(c->suffixlist,
+                                            sizeof(char *) * c->suffixsize * 
2);
+           if (new_suffix_list) {
+               c->suffixsize *= 2;
+               c->suffixlist  = new_suffix_list;
+           } else {
+             item_remove( it );
+             return 0;
+           }
+       }
+
+       char *suffix = suffix_from_freelist();
+       if (suffix == NULL) {
+           out_string(c, "SERVER_ERROR out of memory making CAS suffix");
+           item_remove( it );
+           return -1;
+       }
+       *(c->suffixlist + num) = suffix;
+       sprintf(suffix, " %llu\r\n", it->cas_id);
+       if (add_iov(c, "VALUE ", 6) != 0 ||
+           add_iov(c, ITEM_key(it), it->nkey) != 0 ||
+           add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
+           add_iov(c, suffix, strlen(suffix)) != 0 ||
+           add_iov(c, ITEM_data(it), it->nbytes) != 0)
+       {
+           item_remove( it );
+           return 0;
+       }
+    }
+    else
+    {
+       MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nbytes);
+
+       if (add_iov(c, "VALUE ", 6) != 0 ||
+           add_iov(c, ITEM_key(it), it->nkey) != 0 ||
+           add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
+       {
+           item_remove(it);
+           return 0;
+       }
+    }
+
+
+    if (settings.verbose > 1)
+       fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
+    item_update(it);
+    *(c->ilist + num) = it;
+
+    return 1;
+}
+
+
+// In multi-threaded mode, this is protected by the cache-lock.
+// return 0 if already locked
+// return 1 if we successfully locked.
+int do_item_lock( item *it, conn *c ) {
+
+    if (settings.verbose > 1)
+      fprintf( stderr, "%d locking item %s\n", c->sfd, it->end );
+
+    assert( c->lock.ilock == NULL );
+    assert( c->lock.prev == NULL );
+    assert( c->lock.next == NULL );
+
+    c->lock.ilock = it;
+
+    if( it->lock )
+    {
+       conn *list;
+
+       for( list = it->lock->lock.next, c->lock.prev = it->lock;
+            list;
+            list = list->lock.next )
+       { c->lock.prev = list; }
+
+       assert( c->lock.prev );
+
+       c->lock.prev->lock.next = c;
+
+        DEBUG_LOCK( it );
+
+        return 0;
+    }
+    else
+    {
+       it->lock = c;
+
+       assert( it->lock->lock.prev == NULL );
+       assert( it->lock->lock.next == NULL );
+
+        DEBUG_LOCK( it );
+
+       return 1;
+    }
+}
+
+void do_item_unlock( item *it, conn *c ) {
+
+    if (settings.verbose > 1)
+      fprintf( stderr, "%d unlocking item %s\n", c->sfd, it->end );
+
+    // We have to be the connection locking this item.
+    assert( it->lock == c );
+    // This has to be the item we're locking.
+    assert( c->lock.ilock == it );
+    // This connection should be at the front of the locking list.
+    assert( c->lock.prev == NULL );
+
+    c->lock.ilock = NULL;
+    it->lock = c->lock.next;
+    c->lock.next = NULL;
+
+    if (it->lock) {
+       conn *next = it->lock;
+       next->lock.prev = NULL;
+
+       if( send_item_to_conn( it, next, 0, true ) >= 0 )
+       {
+           next->icurr = next->ilist;
+           next->ileft = 0;
+           next->suffixcurr = next->suffixlist;
+           next->suffixleft = 0;
+
+           if ( add_iov(next, "END\r\n", 5) != 0
+                || (next->udp && build_udp_headers(next) != 0)) {
+               out_string(next, "SERVER_ERROR out of memory writing get 
response");
+           }
+           else {
+               conn_set_state(next, conn_mwrite);
+               next->msgcurr = 0;
+
+               if (!update_event(next, EV_WRITE | EV_PERSIST)) {
+                   if (settings.verbose > 0)
+                       fprintf(stderr, "Couldn't update event\n");
+                   conn_set_state(c, conn_closing);
+               }
+           }
+       }
+    }
+
+    DEBUG_LOCK( it );
+}
+
+
 typedef struct token_s {
     char *value;
     size_t length;
@@ -1274,88 +1494,21 @@
                 stats_prefix_record_get(key, NULL != it);
             }
             if (it) {
-                if (i >= c->isize) {
-                    item **new_list = realloc(c->ilist, sizeof(item *) * 
c->isize * 2);
-                    if (new_list) {
-                        c->isize *= 2;
-                        c->ilist = new_list;
-                    } else  {
-                        item_remove(it);
-                        break;
-                    }
-                }
-
-                /*
-                 * Construct the response. Each hit adds three elements to the
-                 * outgoing data list:
-                 *   "VALUE "
-                 *   key
-                 *   " " + flags + " " + data length + "\r\n" + data (with 
\r\n)
-                 */
-
-                if(return_cas == true)
-                {
-                  MEMCACHED_COMMAND_GETS(c->sfd, ITEM_key(it), it->nbytes,
-                                         it->cas_id);
-                  /* Goofy mid-flight realloc. */
-                  if (i >= c->suffixsize) {
-                    char **new_suffix_list = realloc(c->suffixlist,
-                                           sizeof(char *) * c->suffixsize * 2);
-                    if (new_suffix_list) {
-                      c->suffixsize *= 2;
-                      c->suffixlist  = new_suffix_list;
-                    } else {
-                        item_remove(it);
-                        break;
-                    }
-                  }
-
-                  suffix = suffix_from_freelist();
-                  if (suffix == NULL) {
-                    STATS_LOCK();
-                    stats.get_cmds   += stats_get_cmds;
-                    stats.get_hits   += stats_get_hits;
-                    stats.get_misses += stats_get_misses;
-                    STATS_UNLOCK();
-                    out_string(c, "SERVER_ERROR out of memory making CAS 
suffix");
-                    item_remove(it);
-                    return;
-                  }
-                  *(c->suffixlist + i) = suffix;
-                  sprintf(suffix, " %llu\r\n", it->cas_id);
-                  if (add_iov(c, "VALUE ", 6) != 0 ||
-                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
-                      add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
-                      add_iov(c, suffix, strlen(suffix)) != 0 ||
-                      add_iov(c, ITEM_data(it), it->nbytes) != 0)
-                      {
-                          item_remove(it);
-                          break;
-                      }
-                }
-                else
-                {
-                  MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nbytes);
-
-                  if (add_iov(c, "VALUE ", 6) != 0 ||
-                      add_iov(c, ITEM_key(it), it->nkey) != 0 ||
-                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 
0)
-                      {
-                          item_remove(it);
-                          break;
-                      }
-                }
-
-
-                if (settings.verbose > 1)
-                    fprintf(stderr, ">%d sending key %s\n", c->sfd, 
ITEM_key(it));
 
+               int send_result = send_item_to_conn( it, c, i, return_cas );
+               if (send_result == -1) {
+                   STATS_LOCK();
+                   stats.get_cmds   += stats_get_cmds;
+                   stats.get_hits   += stats_get_hits;
+                   stats.get_misses += stats_get_misses;
+                   STATS_UNLOCK();
+                   return;
+               } else if (send_result == 0 )
+                   break;
+                 
                 /* item_get() has incremented it->refcount for us */
                 stats_get_hits++;
-                item_update(it);
-                *(c->ilist + i) = it;
                 i++;
-
             } else {
                 stats_get_misses++;
                 if (return_cas) {
@@ -1543,6 +1696,10 @@
         out_string(c, "NOT_FOUND");
         return;
     }
+    else if( it == c->lock.ilock ) {
+       out_string(c, "ERROR cas only for locked items");
+       return;
+    }
 
     out_string(c, add_delta(c, it, incr, delta, temp));
     item_remove(it);         /* release our reference */
@@ -1648,7 +1805,9 @@
     it = item_get(key, nkey);
     if (it) {
         MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), exptime);
-        if (exptime == 0) {
+       if (it == c->lock.ilock )
+           out_string(c, "ERROR locked items may not be deleted." );
+       else if (exptime == 0) {
             item_unlink(it);
             item_remove(it);      /* release our reference */
             out_string(c, "DELETED");
@@ -1704,6 +1863,102 @@
     return;
 }
 
+static void process_lock_command(conn *c, token_t *tokens, const size_t 
ntokens ) {
+    char *key;
+    size_t nkey;
+    item *it;
+    token_t *key_token = &tokens[KEY_TOKEN];
+
+    assert(c != NULL);
+    
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+       /* Don't reset bucket here, since we're not actually going to get the
+        * item yet. */
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    if (c->lock.ilock) {
+       out_string(c, "CLIENT_ERROR already locked");
+       return;
+    }
+
+    key = key_token->value;
+    nkey = key_token->length;
+
+    if(nkey > KEY_MAX_LENGTH) {
+       out_string(c, "CLIENT_ERROR bad command line format");
+       return;
+    }
+
+    it = item_get(key, nkey);
+
+    if (it) {
+       if( item_lock( it, c ) ) { 
+           process_get_command( c, tokens, ntokens, true );
+       }
+       else
+       { conn_set_state(c, conn_wait); }
+    } else {
+       out_string(c, "END");
+    }
+}
+
+static void process_unlock_command( conn *c, token_t *tokens, const size_t 
ntokens ) {
+
+    assert(c != NULL);
+    
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+       /* Don't reset bucket here, since we're not actually going to get the
+        * item yet. */
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+    if (! c->lock.ilock) {
+       out_string(c, "CLIENT_ERROR no lock acquired");
+       return;
+    }
+
+    unlink_lock( c );
+
+    out_string(c, "OK");
+}
+
+static void process_list_command( conn *c, token_t *tokens, const size_t 
ntokens ) {
+
+    assert( c != NULL );
+
+    if (settings.managed) {
+        int bucket = c->bucket;
+        if (bucket == -1) {
+            out_string(c, "CLIENT_ERROR no BG data in managed mode");
+            return;
+        }
+       /* Don't reset bucket here, since we're not actually going to get the
+        * item yet. */
+        if (buckets[bucket] != c->gen) {
+            out_string(c, "ERROR_NOT_OWNER");
+            return;
+        }
+    }
+
+}
+
 static void process_command(conn *c, char *command) {
 
     token_t tokens[MAX_TOKENS];
@@ -1904,6 +2159,12 @@
 #endif
     } else if ((ntokens == 3 || ntokens == 4) && 
(strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
         process_verbosity_command(c, tokens, ntokens);
+    } else if ((ntokens == 3 || ntokens == 4) && 
(strcmp(tokens[COMMAND_TOKEN].value, "lock") == 0 )) {
+       process_lock_command(c, tokens, ntokens);
+    } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, 
"unlock") == 0 )) {
+       process_unlock_command(c, tokens, ntokens);
+    } else if ((ntokens == 2 || ntokens == 4) && 
(strcmp(tokens[COMMAND_TOKEN].value, "list") == 0 )) {
+       process_list_command(c, tokens, ntokens);
     } else {
         out_string(c, "ERROR");
     }
@@ -2262,6 +2523,31 @@
             conn_set_state(c, conn_closing);
             break;
 
+       case conn_wait:
+            /*  now try reading from the socket */
+            res = read(c->sfd, c->rbuf, c->rsize );
+            if (res > 0) {
+                STATS_LOCK();
+                stats.bytes_read += res;
+                STATS_UNLOCK();
+                break;
+            }
+            if (res == 0) { /* end of stream */
+                conn_set_state(c, conn_closing);
+                break;
+            }
+            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+                if (!update_event(c, EV_READ | EV_PERSIST)) {
+                    if (settings.verbose > 0)
+                        fprintf(stderr, "Couldn't update event\n");
+                    conn_set_state(c, conn_closing);
+                    break;
+                }
+                stop = true;
+                break;
+            }
+           break;
+
         case conn_swallow:
             /* we are reading sbytes and throwing them away */
             if (c->sbytes == 0) {
@@ -2398,6 +2684,9 @@
         return;
     }
 
+    if (settings.verbose > 0)
+      fprintf(stderr, ">%d event_handler: %d\n", fd, which );
+
     drive_machine(c);
 
     /* wait for next event */
diff -Naur memcached-1.2.6/memcached.h memcached-1.2.6p/memcached.h
--- memcached-1.2.6/memcached.h 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/memcached.h        2008-10-02 10:19:37.000000000 -0400
@@ -107,6 +107,8 @@
 /* temp */
 #define ITEM_SLABBED 4
 
+typedef struct conn conn;
+
 typedef struct _stritem {
     struct _stritem *next;
     struct _stritem *prev;
@@ -120,6 +122,7 @@
     uint8_t         slabs_clsid;/* which slab class we're in */
     uint8_t         nkey;       /* key length, w/terminating null and padding 
*/
     uint64_t        cas_id;     /* the CAS identifier */
+    conn            *lock;      /* The connection that has a lock on this 
item. */
     void * end[];
     /* then null-terminated key */
     /* then " flags length\r\n" (no terminating null) */
@@ -134,13 +137,14 @@
 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + 
(item)->nsuffix + (item)->nbytes)
 
 enum conn_states {
-    conn_listening,  /** the socket which listens for connections */
-    conn_read,       /** reading in a command line */
-    conn_write,      /** writing out a simple response */
-    conn_nread,      /** reading in a fixed number of bytes */
-    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
-    conn_closing,    /** closing this connection */
-    conn_mwrite,     /** writing out many items sequentially */
+    conn_listening,  /** =0 the socket which listens for connections */
+    conn_read,       /** =1 reading in a command line */
+    conn_write,      /** =2 writing out a simple response */
+    conn_nread,      /** =3 reading in a fixed number of bytes */
+    conn_swallow,    /** =4 swallowing unnecessary bytes w/o storing */
+    conn_closing,    /** =5 closing this connection */
+    conn_mwrite,     /** =6 writing out many items sequentially */
+    conn_wait,       /** =7 connection is waiting for a lock */
 };
 
 #define NREAD_ADD 1
@@ -150,7 +154,6 @@
 #define NREAD_PREPEND 5
 #define NREAD_CAS 6
 
-typedef struct conn conn;
 struct conn {
     int    sfd;
     int    state;
@@ -222,6 +225,14 @@
     int    gen;       /* generation requested for the bucket */
     bool   noreply;   /* True if the reply should not be sent. */
     conn   *next;     /* Used for generating a list of conn structures */
+
+    /* data for item locking */
+    struct 
+    {
+      item *ilock;
+      conn *next;
+      conn *prev;
+    } lock;
 };
 
 /* number of virtual buckets for a managed instance */
@@ -241,8 +252,10 @@
 char *do_defer_delete(item *item, time_t exptime);
 void do_run_deferred_deletes(void);
 char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta,
-                   char *buf);
-int do_store_item(item *item, int comm);
+                  char *buf);
+int do_store_item(item *item, conn *c);
+int do_item_lock( item *it, conn *c );
+void do_item_unlock( item *it, conn *c );
 conn *conn_new(const int sfd, const int init_state, const int event_flags, 
const int read_buffer_size, const bool is_udp, struct event_base *base);
 
 
@@ -299,7 +312,7 @@
 char *mt_slabs_stats(int *buflen);
 void  mt_stats_lock(void);
 void  mt_stats_unlock(void);
-int   mt_store_item(item *item, int comm);
+int   mt_store_item(item *item, conn *c);
 
 
 # define add_delta(c,x,y,z,a)        mt_add_delta(c,x,y,z,a)
@@ -321,6 +334,8 @@
 # define item_stats_sizes(x)         mt_item_stats_sizes(x)
 # define item_update(x)              mt_item_update(x)
 # define item_unlink(x)              mt_item_unlink(x)
+# define item_lock(x,c)              mt_item_lock(x,c)
+# define item_unlock(x,c)            mt_item_unlock(x,c)
 # define run_deferred_deletes()      mt_run_deferred_deletes()
 # define slabs_alloc(x,y)            mt_slabs_alloc(x,y)
 # define slabs_free(x,y,z)           mt_slabs_free(x,y,z)
@@ -354,6 +369,8 @@
 # define item_stats_sizes(x)         do_item_stats_sizes(x)
 # define item_unlink(x)              do_item_unlink(x)
 # define item_update(x)              do_item_update(x)
+# define item_lock(x,c)              do_item_lock(x,c)
+# define item_unlock(x,c)            do_item_unlock(x,c)
 # define run_deferred_deletes()      do_run_deferred_deletes()
 # define slabs_alloc(x,y)            do_slabs_alloc(x,y)
 # define slabs_free(x,y,z)           do_slabs_free(x,y,z)
diff -Naur memcached-1.2.6/thread.c memcached-1.2.6p/thread.c
--- memcached-1.2.6/thread.c    2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/thread.c   2008-10-02 10:19:37.000000000 -0400
@@ -510,11 +510,11 @@
 /*
  * Stores an item in the cache (high level, obeys set/add/replace semantics)
  */
-int mt_store_item(item *item, int comm) {
+int mt_store_item(item *item, conn *c) {
     int ret;
 
     pthread_mutex_lock(&cache_lock);
-    ret = do_store_item(item, comm);
+    ret = do_store_item(item, c);
     pthread_mutex_unlock(&cache_lock);
     return ret;
 }
@@ -619,6 +619,23 @@
     pthread_mutex_unlock(&stats_lock);
 }
 
+/******************************* ITEM LOCKING ******************************/
+
+int mt_item_lock( item *it, conn *c ) {
+  int result = 0;
+  pthread_mutex_lock( &stats_lock );
+  result = do_item_lock( it, c );
+  pthread_mutex_unlock( &stats_lock );
+
+  return result;
+}
+
+void mt_item_unlock( item *it, conn *c ) {
+  pthread_mutex_lock( &stats_lock );
+  do_item_unlock( it, c );
+  pthread_mutex_unlock( &stats_lock );
+}
+
 /*
  * Initializes the thread subsystem, creating various worker threads.
  *

Reply via email to