It seems that a locking command is something that has long been
requested, but never needed enough to actually be implemented. At
GameLogic, I've created a set of mutually-exclusive, advisory locking
commands. It works like this:
'lock key-name' - locks the item, and performs the equivalent of a
'gets' on the key. 'unlock key-name' - drop the lock. If any other
clients are waiting for the lock, the key's value is transmitted to the
next client (FIFO).
'cas key-name' - updates and unlocks the item (checking the cas). If
any other clients are waiting for the lock, the key's new value is
transmitted to the next client (FIFO).
A given connection may only lock one key at a time (which keeps things
simple and prevents deadlocks). The 2nd connection to request a lock
will block until the item is unlocked. The locks are advisory, if
another client 'get' or 'gets' the key, they will be allowed to proceed
normally. However, in order to unlock the item, you must perform a
'cas' command.
The implementation is currently missing:
'trylock' - command that attempts to lock, but will fail immediately if
the item is already locked.
We've been using this patch in production for about a month now at
GameLogic. I'm fairly confident that the functionality that is
implemented is reasonably bulletproof, at least in our limited usage of
memcached.
- Peter Kovacs
diff -Naur memcached-1.2.6/ChangeLog memcached-1.2.6p/ChangeLog
--- memcached-1.2.6/ChangeLog 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/ChangeLog 2008-10-02 10:30:58.000000000 -0400
@@ -1,3 +1,8 @@
+2008-10-02
+
+ * Add support for mutually-exclusive advisory 'lock' command
+ (GameLogic.com)
+
2008-07-24 [Version 1.2.6-rc1 released]
* Add support for newer automake (Facebook)
diff -Naur memcached-1.2.6/assoc.h memcached-1.2.6p/assoc.h
--- memcached-1.2.6/assoc.h 2008-04-26 17:58:33.000000000 -0400
+++ memcached-1.2.6p/assoc.h 2008-10-02 10:19:37.000000000 -0400
@@ -1,3 +1,6 @@
+#ifndef __MEMCACHE_ASSOC_H__
+#define __MEMCACHE_ASSOC_H__
+
/* associative array */
void assoc_init(void);
item *assoc_find(const char *key, const size_t nkey);
@@ -5,3 +8,5 @@
void assoc_delete(const char *key, const size_t nkey);
void do_assoc_move_next_bucket(void);
uint32_t hash( const void *key, size_t length, const uint32_t initval);
+
+#endif
diff -Naur memcached-1.2.6/items.c memcached-1.2.6p/items.c
--- memcached-1.2.6/items.c 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/items.c 2008-10-02 10:19:37.000000000 -0400
@@ -1,6 +1,8 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* vim:set sts=4 sw=4 */
/* $Id$ */
#include "memcached.h"
+#include "items.h"
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/signal.h>
@@ -53,18 +55,6 @@
return ++cas_id;
}
-/* Enable this for reference-count debugging. */
-#if 0
-# define DEBUG_REFCNT(it,op) \
- fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
- it, op, it->refcount, \
- (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
- (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
- (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
-#else
-# define DEBUG_REFCNT(it,op) while(0)
-#endif
-
/**
* Generates the variable-sized part of the header for an object.
*
@@ -156,6 +146,7 @@
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
+ it->lock = 0;
return it;
}
@@ -291,9 +282,17 @@
}
int do_item_replace(item *it, item *new_it) {
+ conn *c;
+
MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nbytes,
- ITEM_key(new_it), new_it->nbytes);
+ ITEM_key(new_it), new_it->nbytes);
assert((it->it_flags & ITEM_SLABBED) == 0);
+ assert( new_it->lock == NULL );
+
+ new_it->lock = it->lock;
+ for( c = new_it->lock; c; c = c->lock.next )
+ c->lock.ilock = new_it;
+ it->lock = NULL;
do_item_unlink(it);
return do_item_link(new_it);
@@ -421,7 +420,10 @@
item *do_item_get_notedeleted(const char *key, const size_t nkey, bool
*delete_locked) {
item *it = assoc_find(key, nkey);
if (delete_locked) *delete_locked = false;
- if (it != NULL && (it->it_flags & ITEM_DELETED)) {
+
+ /* don't allow locked items to be deleted. */
+
+ if (it != NULL && it->lock == NULL && (it->it_flags & ITEM_DELETED)) {
/* it's flagged as delete-locked. let's see if that condition
is past due, and the 5-second delete_timer just hasn't
gotten to it yet... */
@@ -430,12 +432,14 @@
it = NULL;
}
}
- if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <=
current_time &&
+ if (it != NULL && it->lock == NULL &&
+ settings.oldest_live != 0 && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
do_item_unlink(it); /* MTSAFE - cache_lock held */
it = NULL;
}
- if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
+ if (it != NULL && it->lock == NULL &&
+ it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it); /* MTSAFE - cache_lock held */
it = NULL;
}
@@ -486,3 +490,4 @@
}
}
}
+
diff -Naur memcached-1.2.6/items.h memcached-1.2.6p/items.h
--- memcached-1.2.6/items.h 2008-04-27 20:37:56.000000000 -0400
+++ memcached-1.2.6p/items.h 2008-10-02 10:19:37.000000000 -0400
@@ -1,4 +1,39 @@
+#ifndef __MEMCACHE_ITEM_H__
+#define __MEMCACHE_ITEM_H__
+
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+ fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
+ it, op, it->refcount, \
+ (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
+ (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
+ (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
+
+#if 0
+# define DEBUG_LOCK( it ) \
+ do \
+ { \
+ conn *xxx_; \
+ fprintf( stderr, "ITEM %p LOCK ", it ); \
+ for( xxx_ = it->lock; xxx_; xxx_ = xxx_->lock.next ) \
+ { \
+ fprintf( stderr, "->%p", xxx_ ); \
+ if( xxx_->lock.next ) \
+ { assert( xxx_->lock.next->lock.prev == xxx_ ); } \
+ } \
+ fprintf( stderr, "\n" ); \
+ } while( 0 )
+#else
+# define DEBUG_LOCK( it ) while(0)
+#endif
+
+
/* See items.c */
+/** Initialize item table. */
void item_init(void);
/[EMAIL PROTECTED]@*/
item *do_item_alloc(char *key, const size_t nkey, const int flags, const
rel_time_t exptime, const int nbytes);
@@ -22,3 +57,5 @@
item *do_item_get_notedeleted(const char *key, const size_t nkey, bool
*delete_locked);
item *do_item_get_nocheck(const char *key, const size_t nkey);
+
+#endif
diff -Naur memcached-1.2.6/memcached.c memcached-1.2.6p/memcached.c
--- memcached-1.2.6/memcached.c 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/memcached.c 2008-10-02 10:19:37.000000000 -0400
@@ -1,4 +1,5 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* vim:set sts=4 sw=4 */
/*
* memcached - memory caching daemon
*
@@ -370,6 +371,11 @@
c->noreply = false;
+ /* Initially, we're not locking anything. */
+ c->lock.ilock = 0;
+ c->lock.next = 0;
+ c->lock.prev = 0;
+
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
@@ -392,31 +398,60 @@
return c;
}
+static void unlink_lock( conn *c )
+{
+ if( c->lock.ilock )
+ {
+ // We're the actual connection holding the lock, therefore unlock.
+ if( c == c->lock.ilock->lock ) {
+ item_unlock( c->lock.ilock, c );
+ } else {
+ // If we don't have the actual lock, then we're always going to
+ // have a prev.
+ DEBUG_LOCK( c->lock.ilock );
+
+ assert( c->lock.prev != NULL );
+
+ c->lock.prev->lock.next = c->lock.next;
+ if( c->lock.next ) {
+ c->lock.next->lock.prev = c->lock.prev;
+ }
+
+ DEBUG_LOCK( c->lock.ilock );
+ }
+
+ c->lock.ilock = NULL;
+ c->lock.prev = c->lock.next = NULL;
+ }
+}
+
static void conn_cleanup(conn *c) {
assert(c != NULL);
+ unlink_lock( c );
+
if (c->item) {
- item_remove(c->item);
- c->item = 0;
+ item_remove(c->item);
+ c->item = 0;
}
if (c->ileft != 0) {
- for (; c->ileft > 0; c->ileft--,c->icurr++) {
- item_remove(*(c->icurr));
- }
+ for (; c->ileft > 0; c->ileft--,c->icurr++) {
+ item_remove(*(c->icurr));
+ }
}
if (c->suffixleft != 0) {
- for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
- if(suffix_add_to_freelist(*(c->suffixcurr))) {
- free(*(c->suffixcurr));
- }
- }
+ for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
+ if(suffix_add_to_freelist(*(c->suffixcurr))) {
+ free(*(c->suffixcurr));
+ }
+ }
}
if (c->write_and_free) {
- free(c->write_and_free);
- c->write_and_free = 0;
+ free(c->write_and_free);
+ c->write_and_free = 0;
}
}
@@ -541,7 +576,7 @@
if (state == conn_read) {
conn_shrink(c);
assoc_move_next_bucket();
- }
+ }
c->state = state;
if (state == conn_write) {
@@ -777,7 +812,6 @@
assert(c != NULL);
item *it = c->item;
- int comm = c->item_comm;
int ret;
STATS_LOCK();
@@ -787,7 +821,7 @@
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
- ret = store_item(it, comm);
+ ret = store_item(it, c);
if (ret == 1) {
out_string(c, "STORED");
#ifdef HAVE_DTRACE
@@ -817,6 +851,8 @@
out_string(c, "EXISTS");
else if(ret == 3)
out_string(c, "NOT_FOUND");
+ else if(ret == 4)
+ out_string(c, "ERROR cas only for locked items");
else
out_string(c, "NOT_STORED");
}
@@ -831,7 +867,8 @@
*
* Returns true if the item was stored.
*/
-int do_store_item(item *it, int comm) {
+int do_store_item(item *it, conn *c) {
+ int command = c->item_comm;
char *key = ITEM_key(it);
bool delete_locked = false;
item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
@@ -840,18 +877,22 @@
item *new_it = NULL;
int flags;
- if (old_it != NULL && comm == NREAD_ADD) {
+ if (c->lock.ilock != old_it &&
+ command == NREAD_CAS )
+ { stored = 4; }
+
+ if (old_it != NULL && command == NREAD_ADD) {
/* add only adds a nonexistent item, but promote to head of LRU */
do_item_update(old_it);
- } else if (!old_it && (comm == NREAD_REPLACE
- || comm == NREAD_APPEND || comm == NREAD_PREPEND))
+ } else if (!old_it && (command == NREAD_REPLACE
+ || command == NREAD_APPEND || command == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
- } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
- || comm == NREAD_APPEND || comm == NREAD_PREPEND))
+ } else if (delete_locked && (command == NREAD_REPLACE || command ==
NREAD_ADD
+ || command == NREAD_APPEND || command == NREAD_PREPEND))
{
/* replace and add can't override delete locks; don't store */
- } else if (comm == NREAD_CAS) {
+ } else if (command == NREAD_CAS) {
/* validate cas operation */
if (delete_locked)
old_it = do_item_get_nocheck(key, it->nkey);
@@ -861,8 +902,15 @@
stored = 3;
}
else if(it->cas_id == old_it->cas_id) {
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d cas validates, lock %p\n", c->sfd,
it->lock);
+
// cas validates
do_item_replace(old_it, it);
+
+ if( it->lock )
+ item_unlock( it, c );
+
stored = 1;
}
else
@@ -875,7 +923,7 @@
* atomic and thread-safe.
*/
- if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
+ if (command == NREAD_APPEND || command == NREAD_PREPEND) {
/* we have it and old_it here - alloc memory to hold both */
/* flags was already lost - so recover them from ITEM_suffix(it) */
@@ -894,7 +942,7 @@
/* copy data from it and old_it to new_it */
- if (comm == NREAD_APPEND) {
+ if (command == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */,
ITEM_data(it), it->nbytes);
} else {
@@ -929,6 +977,178 @@
return stored;
}
+static inline int send_item_to_conn( item *it, conn *c, int num, bool
return_cas ) {
+ if (num >= c->isize) {
+ item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
+ if (new_list) {
+ c->isize *= 2;
+ c->ilist = new_list;
+ } else {
+ item_remove( it );
+ return 0;
+ }
+ }
+
+ /*
+ * Construct the response. Each hit adds three elements to the
+ * outgoing data list:
+ * "VALUE "
+ * key
+ * " " + flags + " " + data length + "\r\n" + data (with \r\n)
+ */
+
+ if(return_cas == true)
+ {
+ MEMCACHED_COMMAND_GETS(c->sfd, ITEM_key(it), it->nbytes,
+ it->cas_id);
+
+ /* Goofy mid-flight realloc. */
+ if (num >= c->suffixsize) {
+ char **new_suffix_list = realloc(c->suffixlist,
+ sizeof(char *) * c->suffixsize *
2);
+ if (new_suffix_list) {
+ c->suffixsize *= 2;
+ c->suffixlist = new_suffix_list;
+ } else {
+ item_remove( it );
+ return 0;
+ }
+ }
+
+ char *suffix = suffix_from_freelist();
+ if (suffix == NULL) {
+ out_string(c, "SERVER_ERROR out of memory making CAS suffix");
+ item_remove( it );
+ return -1;
+ }
+ *(c->suffixlist + num) = suffix;
+ sprintf(suffix, " %llu\r\n", it->cas_id);
+ if (add_iov(c, "VALUE ", 6) != 0 ||
+ add_iov(c, ITEM_key(it), it->nkey) != 0 ||
+ add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
+ add_iov(c, suffix, strlen(suffix)) != 0 ||
+ add_iov(c, ITEM_data(it), it->nbytes) != 0)
+ {
+ item_remove( it );
+ return 0;
+ }
+ }
+ else
+ {
+ MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nbytes);
+
+ if (add_iov(c, "VALUE ", 6) != 0 ||
+ add_iov(c, ITEM_key(it), it->nkey) != 0 ||
+ add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
+ {
+ item_remove(it);
+ return 0;
+ }
+ }
+
+
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
+ item_update(it);
+ *(c->ilist + num) = it;
+
+ return 1;
+}
+
+
+// In multi-threaded mode, this is protected by the cache-lock.
+// return 0 if already locked
+// return 1 if we successfully locked.
+int do_item_lock( item *it, conn *c ) {
+
+ if (settings.verbose > 1)
+ fprintf( stderr, "%d locking item %s\n", c->sfd, it->end );
+
+ assert( c->lock.ilock == NULL );
+ assert( c->lock.prev == NULL );
+ assert( c->lock.next == NULL );
+
+ c->lock.ilock = it;
+
+ if( it->lock )
+ {
+ conn *list;
+
+ for( list = it->lock->lock.next, c->lock.prev = it->lock;
+ list;
+ list = list->lock.next )
+ { c->lock.prev = list; }
+
+ assert( c->lock.prev );
+
+ c->lock.prev->lock.next = c;
+
+ DEBUG_LOCK( it );
+
+ return 0;
+ }
+ else
+ {
+ it->lock = c;
+
+ assert( it->lock->lock.prev == NULL );
+ assert( it->lock->lock.next == NULL );
+
+ DEBUG_LOCK( it );
+
+ return 1;
+ }
+}
+
+void do_item_unlock( item *it, conn *c ) {
+
+ if (settings.verbose > 1)
+ fprintf( stderr, "%d unlocking item %s\n", c->sfd, it->end );
+
+ // We have to be the connection locking this item.
+ assert( it->lock == c );
+ // This has to be the item we're locking.
+ assert( c->lock.ilock == it );
+ // This connection should be at the front of the locking list.
+ assert( c->lock.prev == NULL );
+
+ c->lock.ilock = NULL;
+ it->lock = c->lock.next;
+ c->lock.next = NULL;
+
+ if (it->lock) {
+ conn *next = it->lock;
+ next->lock.prev = NULL;
+
+ if( send_item_to_conn( it, next, 0, true ) >= 0 )
+ {
+ next->icurr = next->ilist;
+ next->ileft = 0;
+ next->suffixcurr = next->suffixlist;
+ next->suffixleft = 0;
+
+ if ( add_iov(next, "END\r\n", 5) != 0
+ || (next->udp && build_udp_headers(next) != 0)) {
+ out_string(next, "SERVER_ERROR out of memory writing get
response");
+ }
+ else {
+ conn_set_state(next, conn_mwrite);
+ next->msgcurr = 0;
+
+ if (!update_event(next, EV_WRITE | EV_PERSIST)) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Couldn't update event\n");
+ conn_set_state(c, conn_closing);
+ }
+ }
+ }
+ }
+
+ DEBUG_LOCK( it );
+}
+
+
typedef struct token_s {
char *value;
size_t length;
@@ -1274,88 +1494,21 @@
stats_prefix_record_get(key, NULL != it);
}
if (it) {
- if (i >= c->isize) {
- item **new_list = realloc(c->ilist, sizeof(item *) *
c->isize * 2);
- if (new_list) {
- c->isize *= 2;
- c->ilist = new_list;
- } else {
- item_remove(it);
- break;
- }
- }
-
- /*
- * Construct the response. Each hit adds three elements to the
- * outgoing data list:
- * "VALUE "
- * key
- * " " + flags + " " + data length + "\r\n" + data (with
\r\n)
- */
-
- if(return_cas == true)
- {
- MEMCACHED_COMMAND_GETS(c->sfd, ITEM_key(it), it->nbytes,
- it->cas_id);
- /* Goofy mid-flight realloc. */
- if (i >= c->suffixsize) {
- char **new_suffix_list = realloc(c->suffixlist,
- sizeof(char *) * c->suffixsize * 2);
- if (new_suffix_list) {
- c->suffixsize *= 2;
- c->suffixlist = new_suffix_list;
- } else {
- item_remove(it);
- break;
- }
- }
-
- suffix = suffix_from_freelist();
- if (suffix == NULL) {
- STATS_LOCK();
- stats.get_cmds += stats_get_cmds;
- stats.get_hits += stats_get_hits;
- stats.get_misses += stats_get_misses;
- STATS_UNLOCK();
- out_string(c, "SERVER_ERROR out of memory making CAS
suffix");
- item_remove(it);
- return;
- }
- *(c->suffixlist + i) = suffix;
- sprintf(suffix, " %llu\r\n", it->cas_id);
- if (add_iov(c, "VALUE ", 6) != 0 ||
- add_iov(c, ITEM_key(it), it->nkey) != 0 ||
- add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
- add_iov(c, suffix, strlen(suffix)) != 0 ||
- add_iov(c, ITEM_data(it), it->nbytes) != 0)
- {
- item_remove(it);
- break;
- }
- }
- else
- {
- MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nbytes);
-
- if (add_iov(c, "VALUE ", 6) != 0 ||
- add_iov(c, ITEM_key(it), it->nkey) != 0 ||
- add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) !=
0)
- {
- item_remove(it);
- break;
- }
- }
-
-
- if (settings.verbose > 1)
- fprintf(stderr, ">%d sending key %s\n", c->sfd,
ITEM_key(it));
+ int send_result = send_item_to_conn( it, c, i, return_cas );
+ if (send_result == -1) {
+ STATS_LOCK();
+ stats.get_cmds += stats_get_cmds;
+ stats.get_hits += stats_get_hits;
+ stats.get_misses += stats_get_misses;
+ STATS_UNLOCK();
+ return;
+ } else if (send_result == 0 )
+ break;
+
/* item_get() has incremented it->refcount for us */
stats_get_hits++;
- item_update(it);
- *(c->ilist + i) = it;
i++;
-
} else {
stats_get_misses++;
if (return_cas) {
@@ -1543,6 +1696,10 @@
out_string(c, "NOT_FOUND");
return;
}
+ else if( it == c->lock.ilock ) {
+ out_string(c, "ERROR cas only for locked items");
+ return;
+ }
out_string(c, add_delta(c, it, incr, delta, temp));
item_remove(it); /* release our reference */
@@ -1648,7 +1805,9 @@
it = item_get(key, nkey);
if (it) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), exptime);
- if (exptime == 0) {
+ if (it == c->lock.ilock )
+ out_string(c, "ERROR locked items may not be deleted." );
+ else if (exptime == 0) {
item_unlink(it);
item_remove(it); /* release our reference */
out_string(c, "DELETED");
@@ -1704,6 +1863,102 @@
return;
}
+static void process_lock_command(conn *c, token_t *tokens, const size_t
ntokens ) {
+ char *key;
+ size_t nkey;
+ item *it;
+ token_t *key_token = &tokens[KEY_TOKEN];
+
+ assert(c != NULL);
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ /* Don't reset bucket here, since we're not actually going to get the
+ * item yet. */
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
+ if (c->lock.ilock) {
+ out_string(c, "CLIENT_ERROR already locked");
+ return;
+ }
+
+ key = key_token->value;
+ nkey = key_token->length;
+
+ if(nkey > KEY_MAX_LENGTH) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ it = item_get(key, nkey);
+
+ if (it) {
+ if( item_lock( it, c ) ) {
+ process_get_command( c, tokens, ntokens, true );
+ }
+ else
+ { conn_set_state(c, conn_wait); }
+ } else {
+ out_string(c, "END");
+ }
+}
+
+static void process_unlock_command( conn *c, token_t *tokens, const size_t
ntokens ) {
+
+ assert(c != NULL);
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ /* Don't reset bucket here, since we're not actually going to get the
+ * item yet. */
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
+ if (! c->lock.ilock) {
+ out_string(c, "CLIENT_ERROR no lock acquired");
+ return;
+ }
+
+ unlink_lock( c );
+
+ out_string(c, "OK");
+}
+
+static void process_list_command( conn *c, token_t *tokens, const size_t
ntokens ) {
+
+ assert( c != NULL );
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ /* Don't reset bucket here, since we're not actually going to get the
+ * item yet. */
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
+}
+
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
@@ -1904,6 +2159,12 @@
#endif
} else if ((ntokens == 3 || ntokens == 4) &&
(strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
process_verbosity_command(c, tokens, ntokens);
+ } else if ((ntokens == 3 || ntokens == 4) &&
(strcmp(tokens[COMMAND_TOKEN].value, "lock") == 0 )) {
+ process_lock_command(c, tokens, ntokens);
+ } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value,
"unlock") == 0 )) {
+ process_unlock_command(c, tokens, ntokens);
+ } else if ((ntokens == 2 || ntokens == 4) &&
(strcmp(tokens[COMMAND_TOKEN].value, "list") == 0 )) {
+ process_list_command(c, tokens, ntokens);
} else {
out_string(c, "ERROR");
}
@@ -2262,6 +2523,31 @@
conn_set_state(c, conn_closing);
break;
+ case conn_wait:
+ /* now try reading from the socket */
+ res = read(c->sfd, c->rbuf, c->rsize );
+ if (res > 0) {
+ STATS_LOCK();
+ stats.bytes_read += res;
+ STATS_UNLOCK();
+ break;
+ }
+ if (res == 0) { /* end of stream */
+ conn_set_state(c, conn_closing);
+ break;
+ }
+ if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (!update_event(c, EV_READ | EV_PERSIST)) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Couldn't update event\n");
+ conn_set_state(c, conn_closing);
+ break;
+ }
+ stop = true;
+ break;
+ }
+ break;
+
case conn_swallow:
/* we are reading sbytes and throwing them away */
if (c->sbytes == 0) {
@@ -2398,6 +2684,9 @@
return;
}
+ if (settings.verbose > 0)
+ fprintf(stderr, ">%d event_handler: %d\n", fd, which );
+
drive_machine(c);
/* wait for next event */
diff -Naur memcached-1.2.6/memcached.h memcached-1.2.6p/memcached.h
--- memcached-1.2.6/memcached.h 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/memcached.h 2008-10-02 10:19:37.000000000 -0400
@@ -107,6 +107,8 @@
/* temp */
#define ITEM_SLABBED 4
+typedef struct conn conn;
+
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
@@ -120,6 +122,7 @@
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding
*/
uint64_t cas_id; /* the CAS identifier */
+ conn *lock; /* The connection that has a lock on this
item. */
void * end[];
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
@@ -134,13 +137,14 @@
#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 +
(item)->nsuffix + (item)->nbytes)
enum conn_states {
- conn_listening, /** the socket which listens for connections */
- conn_read, /** reading in a command line */
- conn_write, /** writing out a simple response */
- conn_nread, /** reading in a fixed number of bytes */
- conn_swallow, /** swallowing unnecessary bytes w/o storing */
- conn_closing, /** closing this connection */
- conn_mwrite, /** writing out many items sequentially */
+ conn_listening, /** =0 the socket which listens for connections */
+ conn_read, /** =1 reading in a command line */
+ conn_write, /** =2 writing out a simple response */
+ conn_nread, /** =3 reading in a fixed number of bytes */
+ conn_swallow, /** =4 swallowing unnecessary bytes w/o storing */
+ conn_closing, /** =5 closing this connection */
+ conn_mwrite, /** =6 writing out many items sequentially */
+ conn_wait, /** =7 connection is waiting for a lock */
};
#define NREAD_ADD 1
@@ -150,7 +154,6 @@
#define NREAD_PREPEND 5
#define NREAD_CAS 6
-typedef struct conn conn;
struct conn {
int sfd;
int state;
@@ -222,6 +225,14 @@
int gen; /* generation requested for the bucket */
bool noreply; /* True if the reply should not be sent. */
conn *next; /* Used for generating a list of conn structures */
+
+ /* data for item locking */
+ struct
+ {
+ item *ilock;
+ conn *next;
+ conn *prev;
+ } lock;
};
/* number of virtual buckets for a managed instance */
@@ -241,8 +252,10 @@
char *do_defer_delete(item *item, time_t exptime);
void do_run_deferred_deletes(void);
char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta,
- char *buf);
-int do_store_item(item *item, int comm);
+ char *buf);
+int do_store_item(item *item, conn *c);
+int do_item_lock( item *it, conn *c );
+void do_item_unlock( item *it, conn *c );
conn *conn_new(const int sfd, const int init_state, const int event_flags,
const int read_buffer_size, const bool is_udp, struct event_base *base);
@@ -299,7 +312,7 @@
char *mt_slabs_stats(int *buflen);
void mt_stats_lock(void);
void mt_stats_unlock(void);
-int mt_store_item(item *item, int comm);
+int mt_store_item(item *item, conn *c);
# define add_delta(c,x,y,z,a) mt_add_delta(c,x,y,z,a)
@@ -321,6 +334,8 @@
# define item_stats_sizes(x) mt_item_stats_sizes(x)
# define item_update(x) mt_item_update(x)
# define item_unlink(x) mt_item_unlink(x)
+# define item_lock(x,c) mt_item_lock(x,c)
+# define item_unlock(x,c) mt_item_unlock(x,c)
# define run_deferred_deletes() mt_run_deferred_deletes()
# define slabs_alloc(x,y) mt_slabs_alloc(x,y)
# define slabs_free(x,y,z) mt_slabs_free(x,y,z)
@@ -354,6 +369,8 @@
# define item_stats_sizes(x) do_item_stats_sizes(x)
# define item_unlink(x) do_item_unlink(x)
# define item_update(x) do_item_update(x)
+# define item_lock(x,c) do_item_lock(x,c)
+# define item_unlock(x,c) do_item_unlock(x,c)
# define run_deferred_deletes() do_run_deferred_deletes()
# define slabs_alloc(x,y) do_slabs_alloc(x,y)
# define slabs_free(x,y,z) do_slabs_free(x,y,z)
diff -Naur memcached-1.2.6/thread.c memcached-1.2.6p/thread.c
--- memcached-1.2.6/thread.c 2008-07-29 12:37:27.000000000 -0400
+++ memcached-1.2.6p/thread.c 2008-10-02 10:19:37.000000000 -0400
@@ -510,11 +510,11 @@
/*
* Stores an item in the cache (high level, obeys set/add/replace semantics)
*/
-int mt_store_item(item *item, int comm) {
+int mt_store_item(item *item, conn *c) {
int ret;
pthread_mutex_lock(&cache_lock);
- ret = do_store_item(item, comm);
+ ret = do_store_item(item, c);
pthread_mutex_unlock(&cache_lock);
return ret;
}
@@ -619,6 +619,23 @@
pthread_mutex_unlock(&stats_lock);
}
+/******************************* ITEM LOCKING ******************************/
+
+int mt_item_lock( item *it, conn *c ) {
+ int result = 0;
+ pthread_mutex_lock( &stats_lock );
+ result = do_item_lock( it, c );
+ pthread_mutex_unlock( &stats_lock );
+
+ return result;
+}
+
+void mt_item_unlock( item *it, conn *c ) {
+ pthread_mutex_lock( &stats_lock );
+ do_item_unlock( it, c );
+ pthread_mutex_unlock( &stats_lock );
+}
+
/*
* Initializes the thread subsystem, creating various worker threads.
*