From: Ripduman Sohan <[email protected]> Changes: 1. Per bucket rwlock to increase parallelism. 2. cache_lock is now a rwlock, allowing safe expansion.
Signed-off-by: Ripduman Sohan <[email protected]> --- engines/default_engine/assoc.c | 151 +++++++++++++++++++++---------- engines/default_engine/assoc.h | 17 +++-- engines/default_engine/default_engine.c | 4 +- engines/default_engine/default_engine.h | 2 +- engines/default_engine/items.c | 4 +- 5 files changed, 118 insertions(+), 60 deletions(-) diff --git a/engines/default_engine/assoc.c b/engines/default_engine/assoc.c index dc4c09c..feeacd1 100644 --- a/engines/default_engine/assoc.c +++ b/engines/default_engine/assoc.c @@ -17,25 +17,82 @@ #define hashsize(n) ((uint32_t)1<<(n)) #define hashmask(n) (hashsize(n)-1) +static int assoc_create(struct assoc_bucket **bucket, uint32_t nr_buckets) +{ + int rc = 0; + struct assoc_bucket *ht = calloc(nr_buckets, sizeof(*ht)); + + if (ht) { + for (int i = 0; i < nr_buckets; i++) + if (pthread_rwlock_init(&ht[i].lock, NULL)) { + rc = 1; + break; + } + } + + *bucket = ht; + + return rc; +} + +static void assoc_destroy(struct assoc_bucket **bucket, uint32_t nr_buckets) +{ + struct assoc_bucket *ht = *bucket; + + if (ht) { + for (int i = 0; i < nr_buckets; i++) + pthread_rwlock_destroy(&ht[i].lock); + } + + free(ht); + + *bucket = NULL; +} + ENGINE_ERROR_CODE assoc_init(struct default_engine *engine) { - engine->assoc.primary_hashtable = calloc(hashsize(engine->assoc.hashpower), sizeof(void *)); + uint32_t nr_buckets = hashsize(engine->assoc.hashpower); + struct assoc_bucket *ht; + if (assoc_create(&ht, nr_buckets)) + assoc_destroy(&ht, nr_buckets); + engine->assoc.primary_hashtable = ht; return (engine->assoc.primary_hashtable != NULL) ? ENGINE_SUCCESS : ENGINE_ENOMEM; } -hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char *key, const size_t nkey) { - hash_item *it; +static inline struct assoc_bucket *get_assoc_bucket_ref(struct default_engine *engine, + uint32_t hash, int wr_lock) +{ unsigned int oldbucket; + struct assoc_bucket *it; - pthread_mutex_lock(&engine->cache_lock); + pthread_rwlock_rdlock(&engine->cache_lock); if (engine->assoc.expanding && (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= engine->assoc.expand_bucket) { - it = engine->assoc.old_hashtable[oldbucket]; + it = &engine->assoc.old_hashtable[oldbucket]; } else { - it = engine->assoc.primary_hashtable[hash & hashmask(engine->assoc.hashpower)]; + it = &engine->assoc.primary_hashtable[hash & hashmask(engine->assoc.hashpower)]; } + if (wr_lock) + pthread_rwlock_wrlock(&it->lock); + else + pthread_rwlock_rdlock(&it->lock); + + return it; +} + +static inline void put_assoc_bucket_ref(struct default_engine *engine, struct assoc_bucket *bucket) +{ + pthread_rwlock_unlock(&bucket->lock); + pthread_rwlock_unlock(&engine->cache_lock); +} + +hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char *key, const size_t nkey) { + + struct assoc_bucket *bucket = get_assoc_bucket_ref(engine, hash, 0); + + hash_item *it = bucket->head; hash_item *ret = NULL; int depth = 0; while (it) { @@ -47,27 +104,19 @@ hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char * ++depth; } MEMCACHED_ASSOC_FIND(key, nkey, depth); - pthread_mutex_unlock(&engine->cache_lock); + put_assoc_bucket_ref(engine, bucket); + return ret; } /* returns the address of the item pointer before the key. if *item == 0, the item wasn't found */ -static hash_item** _hashitem_before(struct default_engine *engine, - uint32_t hash, +static inline hash_item** _hashitem_before(struct default_engine *engine, + struct assoc_bucket *bucket, const char *key, const size_t nkey) { - hash_item **pos; - unsigned int oldbucket; - - if (engine->assoc.expanding && - (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= engine->assoc.expand_bucket) - { - pos = &engine->assoc.old_hashtable[oldbucket]; - } else { - pos = &engine->assoc.primary_hashtable[hash & hashmask(engine->assoc.hashpower)]; - } + hash_item **pos = &bucket->head; while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, item_get_key(*pos), nkey))) { pos = &(*pos)->h_next; @@ -79,9 +128,15 @@ static void *assoc_maintenance_thread(void *arg); /* grows the hashtable to the next power of 2. */ static void assoc_expand(struct default_engine *engine) { + struct assoc_bucket *ht; + uint32_t nr_buckets = hashsize(engine->assoc.hashpower + 1); + engine->assoc.old_hashtable = engine->assoc.primary_hashtable; - engine->assoc.primary_hashtable = calloc(hashsize(engine->assoc.hashpower + 1), sizeof(void *)); + if (assoc_create(&ht, nr_buckets)) + assoc_destroy(&ht, nr_buckets); + + engine->assoc.primary_hashtable = ht; if (engine->assoc.primary_hashtable) { engine->assoc.hashpower++; engine->assoc.expanding = true; @@ -103,7 +158,7 @@ static void assoc_expand(struct default_engine *engine) { "Can't create thread: %s\n", strerror(ret)); engine->assoc.hashpower--; engine->assoc.expanding = false; - free(engine->assoc.primary_hashtable); + assoc_destroy(&engine->assoc.primary_hashtable, nr_buckets); engine->assoc.primary_hashtable =engine->assoc.old_hashtable; } } else { @@ -113,38 +168,37 @@ static void assoc_expand(struct default_engine *engine) { } /* Note: this isn't an assoc_update. The key must not already exist to call this */ -int assoc_insert(struct default_engine *engine, uint32_t hash, hash_item *it) { - unsigned int oldbucket; +int assoc_insert(struct default_engine *engine, hash_item *it) { + struct assoc_bucket *bucket; - assert(assoc_find(engine, hash, item_get_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */ + assert(assoc_find(engine, it->hash, item_get_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */ - pthread_mutex_lock(&engine->cache_lock); + bucket = get_assoc_bucket_ref(engine, it->hash, 1); - if (engine->assoc.expanding && - (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= engine->assoc.expand_bucket) - { - it->h_next = engine->assoc.old_hashtable[oldbucket]; - engine->assoc.old_hashtable[oldbucket] = it; - } else { - it->h_next = engine->assoc.primary_hashtable[hash & hashmask(engine->assoc.hashpower)]; - engine->assoc.primary_hashtable[hash & hashmask(engine->assoc.hashpower)] = it; - } + it->h_next = bucket->head; + bucket->head = it; engine->assoc.hash_items++; - if (! engine->assoc.expanding && engine->assoc.hash_items > (hashsize(engine->assoc.hashpower) * 3) / 2) { - assoc_expand(engine); - } MEMCACHED_ASSOC_INSERT(item_get_key(it), it->nkey, engine->assoc.hash_items); - pthread_mutex_unlock(&engine->cache_lock); + + put_assoc_bucket_ref(engine, bucket); + + if (! engine->assoc.expanding && engine->assoc.hash_items > (hashsize(engine->assoc.hashpower) * 3) / 2) { + pthread_rwlock_wrlock(&engine->cache_lock); + if (! engine->assoc.expanding) /* re-check expansion condition after lock acquisition */ + assoc_expand(engine); + pthread_rwlock_unlock(&engine->cache_lock); + } return 1; } -void assoc_delete(struct default_engine *engine, uint32_t hash, const char *key, const size_t nkey) { - pthread_mutex_lock(&engine->cache_lock); +void assoc_delete(struct default_engine *engine, hash_item *it) { + + struct assoc_bucket *bucket= get_assoc_bucket_ref(engine, it->hash, 1); - hash_item **before = _hashitem_before(engine, hash, key, nkey); + hash_item **before = _hashitem_before(engine, bucket, item_get_key(it), it->nkey); if (*before) { hash_item *nxt; @@ -156,10 +210,10 @@ void assoc_delete(struct default_engine *engine, uint32_t hash, const char *key, nxt = (*before)->h_next; (*before)->h_next = 0; /* probably pointless, but whatever. */ *before = nxt; - pthread_mutex_unlock(&engine->cache_lock); + put_assoc_bucket_ref(engine, bucket); return; } - pthread_mutex_unlock(&engine->cache_lock); + put_assoc_bucket_ref(engine, bucket); /* Note: we never actually get here. the callers don't delete things they can't find. */ assert(*before != 0); @@ -175,23 +229,22 @@ static void *assoc_maintenance_thread(void *arg) { bool done = false; do { int ii; - pthread_mutex_lock(&engine->cache_lock); + pthread_rwlock_wrlock(&engine->cache_lock); for (ii = 0; ii < hash_bulk_move && engine->assoc.expanding; ++ii) { hash_item *it, *next; int bucket; - for (it = engine->assoc.old_hashtable[engine->assoc.expand_bucket]; + for (it = engine->assoc.old_hashtable[engine->assoc.expand_bucket].head; NULL != it; it = next) { next = it->h_next; bucket = engine->server.core->hash(item_get_key(it), it->nkey, 0) & hashmask(engine->assoc.hashpower); - it->h_next = engine->assoc.primary_hashtable[bucket]; - engine->assoc.primary_hashtable[bucket] = it; + it->h_next = engine->assoc.primary_hashtable[bucket].head; + engine->assoc.primary_hashtable[bucket].head = it; } - engine->assoc.old_hashtable[engine->assoc.expand_bucket] = NULL; engine->assoc.expand_bucket++; if (engine->assoc.expand_bucket == hashsize(engine->assoc.hashpower - 1)) { engine->assoc.expanding = false; @@ -207,7 +260,7 @@ static void *assoc_maintenance_thread(void *arg) { if (!engine->assoc.expanding) { done = true; } - pthread_mutex_unlock(&engine->cache_lock); + pthread_rwlock_unlock(&engine->cache_lock); } while (!done); return NULL; diff --git a/engines/default_engine/assoc.h b/engines/default_engine/assoc.h index dc0bf60..47a2718 100644 --- a/engines/default_engine/assoc.h +++ b/engines/default_engine/assoc.h @@ -1,19 +1,26 @@ #ifndef ASSOC_H #define ASSOC_H +#include <pthread.h> + +struct assoc_bucket { + pthread_rwlock_t lock; + hash_item* head; +}; + struct assoc { /* how many powers of 2's worth of buckets we use */ unsigned int hashpower; /* Main hash table. This is where we look except during expansion. */ - hash_item** primary_hashtable; + struct assoc_bucket* primary_hashtable; /* * Previous hash table. During expansion, we look here for keys that haven't * been moved over to the primary yet. */ - hash_item** old_hashtable; + struct assoc_bucket* old_hashtable; /* Number of items in the hash table. */ unsigned int hash_items; @@ -32,10 +39,8 @@ struct assoc { ENGINE_ERROR_CODE assoc_init(struct default_engine *engine); hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char *key, const size_t nkey); -int assoc_insert(struct default_engine *engine, uint32_t hash, - hash_item *item); -void assoc_delete(struct default_engine *engine, uint32_t hash, - const char *key, const size_t nkey); +int assoc_insert(struct default_engine *engine, hash_item *item); +void assoc_delete(struct default_engine *engine, hash_item *item); int start_assoc_maintenance_thread(struct default_engine *engine); void stop_assoc_maintenance_thread(struct default_engine *engine); diff --git a/engines/default_engine/default_engine.c b/engines/default_engine/default_engine.c index b7827d0..3588e3b 100644 --- a/engines/default_engine/default_engine.c +++ b/engines/default_engine/default_engine.c @@ -196,7 +196,7 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface, .slabs = { .lock = PTHREAD_MUTEX_INITIALIZER }, - .cache_lock = PTHREAD_MUTEX_INITIALIZER, + .cache_lock = PTHREAD_RWLOCK_INITIALIZER, .stats = { .lock = PTHREAD_MUTEX_INITIALIZER, }, @@ -290,7 +290,7 @@ static void default_destroy(ENGINE_HANDLE* handle, const bool force) { struct default_engine* se = get_handle(handle); if (se->initialized) { - pthread_mutex_destroy(&se->cache_lock); + pthread_rwlock_destroy(&se->cache_lock); pthread_mutex_destroy(&se->stats.lock); pthread_mutex_destroy(&se->slabs.lock); se->initialized = false; diff --git a/engines/default_engine/default_engine.h b/engines/default_engine/default_engine.h index 72ac7fd..8209038 100644 --- a/engines/default_engine/default_engine.h +++ b/engines/default_engine/default_engine.h @@ -125,7 +125,7 @@ struct default_engine { * The cache layer (item_* and assoc_*) is currently protected by * this single mutex */ - pthread_mutex_t cache_lock; + pthread_rwlock_t cache_lock; /* Used to arbitrate concurrent access/modification of items by * multiple threads */ diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c index 86b78b9..befcc43 100644 --- a/engines/default_engine/items.c +++ b/engines/default_engine/items.c @@ -315,7 +315,7 @@ int do_item_link(struct default_engine *engine, hash_item *it) { assert(it->nbytes < (1024 * 1024)); /* 1MB max size */ it->iflag |= ITEM_LINKED; it->time = engine->server.core->get_current_time(); - assoc_insert(engine, it->hash, it); + assoc_insert(engine, it); pthread_mutex_lock(&engine->stats.lock); engine->stats.curr_bytes += ITEM_ntotal(engine, it); @@ -339,7 +339,7 @@ void do_item_unlink(struct default_engine *engine, hash_item *it) { engine->stats.curr_bytes -= ITEM_ntotal(engine, it); engine->stats.curr_items -= 1; pthread_mutex_unlock(&engine->stats.lock); - assoc_delete(engine, it->hash, item_get_key(it), it->nkey); + assoc_delete(engine, it); item_unlink_q(engine, it); if (it->refcount == 0) { item_free(engine, it); -- 1.7.1
