From: Ripduman Sohan <[email protected]>

Changes:
1. Per bucket rwlock to increase parallelism.
2. cache_lock is now a rwlock, allowing safe expansion.

Signed-off-by: Ripduman Sohan <[email protected]>
---
 engines/default_engine/assoc.c          |  151 +++++++++++++++++++++----------
 engines/default_engine/assoc.h          |   17 +++--
 engines/default_engine/default_engine.c |    4 +-
 engines/default_engine/default_engine.h |    2 +-
 engines/default_engine/items.c          |    4 +-
 5 files changed, 118 insertions(+), 60 deletions(-)

diff --git a/engines/default_engine/assoc.c b/engines/default_engine/assoc.c
index dc4c09c..feeacd1 100644
--- a/engines/default_engine/assoc.c
+++ b/engines/default_engine/assoc.c
@@ -17,25 +17,82 @@
 #define hashsize(n) ((uint32_t)1<<(n))
 #define hashmask(n) (hashsize(n)-1)
 
+static int assoc_create(struct assoc_bucket **bucket, uint32_t nr_buckets)
+{
+    int rc = 0;
+    struct assoc_bucket *ht = calloc(nr_buckets, sizeof(*ht));
+
+    if (ht) {
+        for (int i = 0; i < nr_buckets; i++)
+            if (pthread_rwlock_init(&ht[i].lock, NULL)) {
+                rc = 1;
+                break;
+            }
+    }
+
+    *bucket = ht;
+
+    return rc;
+}
+
+static void assoc_destroy(struct assoc_bucket **bucket, uint32_t nr_buckets)
+{
+    struct assoc_bucket *ht = *bucket;
+
+    if (ht) {
+        for (int i = 0; i < nr_buckets; i++)
+            pthread_rwlock_destroy(&ht[i].lock);
+    }
+
+    free(ht);
+
+    *bucket = NULL;
+}
+
 ENGINE_ERROR_CODE assoc_init(struct default_engine *engine) {
-    engine->assoc.primary_hashtable = 
calloc(hashsize(engine->assoc.hashpower), sizeof(void *));
+    uint32_t nr_buckets = hashsize(engine->assoc.hashpower);
+    struct assoc_bucket *ht;
+    if (assoc_create(&ht, nr_buckets))
+        assoc_destroy(&ht, nr_buckets);
+    engine->assoc.primary_hashtable = ht;
     return (engine->assoc.primary_hashtable != NULL) ? ENGINE_SUCCESS : 
ENGINE_ENOMEM;
 }
 
-hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char 
*key, const size_t nkey) {
-    hash_item *it;
+static inline struct assoc_bucket *get_assoc_bucket_ref(struct default_engine 
*engine,
+                                                        uint32_t hash, int 
wr_lock)
+{
     unsigned int oldbucket;
+    struct assoc_bucket *it;
 
-    pthread_mutex_lock(&engine->cache_lock);
+    pthread_rwlock_rdlock(&engine->cache_lock);
 
     if (engine->assoc.expanding &&
         (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= 
engine->assoc.expand_bucket)
     {
-        it = engine->assoc.old_hashtable[oldbucket];
+        it = &engine->assoc.old_hashtable[oldbucket];
     } else {
-        it = engine->assoc.primary_hashtable[hash & 
hashmask(engine->assoc.hashpower)];
+        it = &engine->assoc.primary_hashtable[hash & 
hashmask(engine->assoc.hashpower)];
     }
 
+    if (wr_lock)
+        pthread_rwlock_wrlock(&it->lock);
+    else
+        pthread_rwlock_rdlock(&it->lock);
+
+    return it;
+}
+
+static inline void put_assoc_bucket_ref(struct default_engine *engine, struct 
assoc_bucket *bucket)
+{
+    pthread_rwlock_unlock(&bucket->lock);
+    pthread_rwlock_unlock(&engine->cache_lock);
+}
+
+hash_item *assoc_find(struct default_engine *engine, uint32_t hash, const char 
*key, const size_t nkey) {
+
+    struct assoc_bucket *bucket = get_assoc_bucket_ref(engine, hash, 0);
+
+    hash_item *it = bucket->head;
     hash_item *ret = NULL;
     int depth = 0;
     while (it) {
@@ -47,27 +104,19 @@ hash_item *assoc_find(struct default_engine *engine, 
uint32_t hash, const char *
         ++depth;
     }
     MEMCACHED_ASSOC_FIND(key, nkey, depth);
-    pthread_mutex_unlock(&engine->cache_lock);
+    put_assoc_bucket_ref(engine, bucket);
+
     return ret;
 }
 
 /* returns the address of the item pointer before the key.  if *item == 0,
    the item wasn't found */
 
-static hash_item** _hashitem_before(struct default_engine *engine,
-                                    uint32_t hash,
+static inline hash_item** _hashitem_before(struct default_engine *engine,
+                                    struct assoc_bucket *bucket,
                                     const char *key,
                                     const size_t nkey) {
-    hash_item **pos;
-    unsigned int oldbucket;
-
-    if (engine->assoc.expanding &&
-        (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= 
engine->assoc.expand_bucket)
-    {
-        pos = &engine->assoc.old_hashtable[oldbucket];
-    } else {
-        pos = &engine->assoc.primary_hashtable[hash & 
hashmask(engine->assoc.hashpower)];
-    }
+    hash_item **pos = &bucket->head;
 
     while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, item_get_key(*pos), 
nkey))) {
         pos = &(*pos)->h_next;
@@ -79,9 +128,15 @@ static void *assoc_maintenance_thread(void *arg);
 
 /* grows the hashtable to the next power of 2. */
 static void assoc_expand(struct default_engine *engine) {
+    struct assoc_bucket *ht;
+    uint32_t nr_buckets = hashsize(engine->assoc.hashpower + 1);
+
     engine->assoc.old_hashtable = engine->assoc.primary_hashtable;
 
-    engine->assoc.primary_hashtable = calloc(hashsize(engine->assoc.hashpower 
+ 1), sizeof(void *));
+    if (assoc_create(&ht, nr_buckets))
+        assoc_destroy(&ht, nr_buckets);
+
+    engine->assoc.primary_hashtable = ht;
     if (engine->assoc.primary_hashtable) {
         engine->assoc.hashpower++;
         engine->assoc.expanding = true;
@@ -103,7 +158,7 @@ static void assoc_expand(struct default_engine *engine) {
                         "Can't create thread: %s\n", strerror(ret));
             engine->assoc.hashpower--;
             engine->assoc.expanding = false;
-            free(engine->assoc.primary_hashtable);
+            assoc_destroy(&engine->assoc.primary_hashtable, nr_buckets);
             engine->assoc.primary_hashtable =engine->assoc.old_hashtable;
         }
     } else {
@@ -113,38 +168,37 @@ static void assoc_expand(struct default_engine *engine) {
 }
 
 /* Note: this isn't an assoc_update.  The key must not already exist to call 
this */
-int assoc_insert(struct default_engine *engine, uint32_t hash, hash_item *it) {
-    unsigned int oldbucket;
+int assoc_insert(struct default_engine *engine, hash_item *it) {
+    struct assoc_bucket *bucket;
 
-    assert(assoc_find(engine, hash, item_get_key(it), it->nkey) == 0);  /* 
shouldn't have duplicately named things defined */
+    assert(assoc_find(engine, it->hash, item_get_key(it), it->nkey) == 0);  /* 
shouldn't have duplicately named things defined */
 
-    pthread_mutex_lock(&engine->cache_lock);
+    bucket = get_assoc_bucket_ref(engine, it->hash, 1);
 
-    if (engine->assoc.expanding &&
-        (oldbucket = (hash & hashmask(engine->assoc.hashpower - 1))) >= 
engine->assoc.expand_bucket)
-    {
-        it->h_next = engine->assoc.old_hashtable[oldbucket];
-        engine->assoc.old_hashtable[oldbucket] = it;
-    } else {
-        it->h_next = engine->assoc.primary_hashtable[hash & 
hashmask(engine->assoc.hashpower)];
-        engine->assoc.primary_hashtable[hash & 
hashmask(engine->assoc.hashpower)] = it;
-    }
+    it->h_next = bucket->head;
+    bucket->head = it;
 
     engine->assoc.hash_items++;
-    if (! engine->assoc.expanding && engine->assoc.hash_items > 
(hashsize(engine->assoc.hashpower) * 3) / 2) {
-        assoc_expand(engine);
-    }
 
     MEMCACHED_ASSOC_INSERT(item_get_key(it), it->nkey, 
engine->assoc.hash_items);
-    pthread_mutex_unlock(&engine->cache_lock);
+
+    put_assoc_bucket_ref(engine, bucket);
+
+    if (! engine->assoc.expanding && engine->assoc.hash_items > 
(hashsize(engine->assoc.hashpower) * 3) / 2) {
+        pthread_rwlock_wrlock(&engine->cache_lock);
+        if (! engine->assoc.expanding) /* re-check expansion condition after 
lock acquisition */
+            assoc_expand(engine);
+        pthread_rwlock_unlock(&engine->cache_lock);
+    }
 
     return 1;
 }
 
-void assoc_delete(struct default_engine *engine, uint32_t hash, const char 
*key, const size_t nkey) {
-    pthread_mutex_lock(&engine->cache_lock);
+void assoc_delete(struct default_engine *engine, hash_item *it) {
+
+    struct assoc_bucket *bucket= get_assoc_bucket_ref(engine, it->hash, 1);
 
-    hash_item **before = _hashitem_before(engine, hash, key, nkey);
+    hash_item **before = _hashitem_before(engine, bucket, item_get_key(it), 
it->nkey);
 
     if (*before) {
         hash_item *nxt;
@@ -156,10 +210,10 @@ void assoc_delete(struct default_engine *engine, uint32_t 
hash, const char *key,
         nxt = (*before)->h_next;
         (*before)->h_next = 0;   /* probably pointless, but whatever. */
         *before = nxt;
-        pthread_mutex_unlock(&engine->cache_lock);
+        put_assoc_bucket_ref(engine, bucket);
         return;
     }
-    pthread_mutex_unlock(&engine->cache_lock);
+    put_assoc_bucket_ref(engine, bucket);
     /* Note:  we never actually get here.  the callers don't delete things
        they can't find. */
     assert(*before != 0);
@@ -175,23 +229,22 @@ static void *assoc_maintenance_thread(void *arg) {
     bool done = false;
     do {
         int ii;
-        pthread_mutex_lock(&engine->cache_lock);
+        pthread_rwlock_wrlock(&engine->cache_lock);
 
         for (ii = 0; ii < hash_bulk_move && engine->assoc.expanding; ++ii) {
             hash_item *it, *next;
             int bucket;
 
-            for (it = engine->assoc.old_hashtable[engine->assoc.expand_bucket];
+            for (it = 
engine->assoc.old_hashtable[engine->assoc.expand_bucket].head;
                  NULL != it; it = next) {
                 next = it->h_next;
 
                 bucket = engine->server.core->hash(item_get_key(it), it->nkey, 
0)
                     & hashmask(engine->assoc.hashpower);
-                it->h_next = engine->assoc.primary_hashtable[bucket];
-                engine->assoc.primary_hashtable[bucket] = it;
+                it->h_next = engine->assoc.primary_hashtable[bucket].head;
+                engine->assoc.primary_hashtable[bucket].head = it;
             }
 
-            engine->assoc.old_hashtable[engine->assoc.expand_bucket] = NULL;
             engine->assoc.expand_bucket++;
             if (engine->assoc.expand_bucket == 
hashsize(engine->assoc.hashpower - 1)) {
                 engine->assoc.expanding = false;
@@ -207,7 +260,7 @@ static void *assoc_maintenance_thread(void *arg) {
         if (!engine->assoc.expanding) {
             done = true;
         }
-        pthread_mutex_unlock(&engine->cache_lock);
+        pthread_rwlock_unlock(&engine->cache_lock);
     } while (!done);
 
     return NULL;
diff --git a/engines/default_engine/assoc.h b/engines/default_engine/assoc.h
index dc0bf60..47a2718 100644
--- a/engines/default_engine/assoc.h
+++ b/engines/default_engine/assoc.h
@@ -1,19 +1,26 @@
 #ifndef ASSOC_H
 #define ASSOC_H
 
+#include <pthread.h>
+
+struct assoc_bucket {
+    pthread_rwlock_t lock;
+    hash_item* head;
+};
+
 struct assoc {
    /* how many powers of 2's worth of buckets we use */
    unsigned int hashpower;
 
 
    /* Main hash table. This is where we look except during expansion. */
-   hash_item** primary_hashtable;
+   struct assoc_bucket* primary_hashtable;
 
    /*
     * Previous hash table. During expansion, we look here for keys that haven't
     * been moved over to the primary yet.
     */
-   hash_item** old_hashtable;
+   struct assoc_bucket* old_hashtable;
 
    /* Number of items in the hash table. */
    unsigned int hash_items;
@@ -32,10 +39,8 @@ struct assoc {
 ENGINE_ERROR_CODE assoc_init(struct default_engine *engine);
 hash_item *assoc_find(struct default_engine *engine, uint32_t hash,
                       const char *key, const size_t nkey);
-int assoc_insert(struct default_engine *engine, uint32_t hash,
-                 hash_item *item);
-void assoc_delete(struct default_engine *engine, uint32_t hash,
-                  const char *key, const size_t nkey);
+int assoc_insert(struct default_engine *engine, hash_item *item);
+void assoc_delete(struct default_engine *engine, hash_item *item);
 int start_assoc_maintenance_thread(struct default_engine *engine);
 void stop_assoc_maintenance_thread(struct default_engine *engine);
 
diff --git a/engines/default_engine/default_engine.c 
b/engines/default_engine/default_engine.c
index b7827d0..3588e3b 100644
--- a/engines/default_engine/default_engine.c
+++ b/engines/default_engine/default_engine.c
@@ -196,7 +196,7 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface,
       .slabs = {
          .lock = PTHREAD_MUTEX_INITIALIZER
       },
-      .cache_lock = PTHREAD_MUTEX_INITIALIZER,
+      .cache_lock = PTHREAD_RWLOCK_INITIALIZER,
       .stats = {
          .lock = PTHREAD_MUTEX_INITIALIZER,
       },
@@ -290,7 +290,7 @@ static void default_destroy(ENGINE_HANDLE* handle, const 
bool force) {
    struct default_engine* se = get_handle(handle);
 
    if (se->initialized) {
-      pthread_mutex_destroy(&se->cache_lock);
+      pthread_rwlock_destroy(&se->cache_lock);
       pthread_mutex_destroy(&se->stats.lock);
       pthread_mutex_destroy(&se->slabs.lock);
       se->initialized = false;
diff --git a/engines/default_engine/default_engine.h 
b/engines/default_engine/default_engine.h
index 72ac7fd..8209038 100644
--- a/engines/default_engine/default_engine.h
+++ b/engines/default_engine/default_engine.h
@@ -125,7 +125,7 @@ struct default_engine {
     * The cache layer (item_* and assoc_*) is currently protected by
     * this single mutex
     */
-   pthread_mutex_t cache_lock;
+   pthread_rwlock_t cache_lock;
 
    /* Used to arbitrate concurrent access/modification of items by
     * multiple threads */
diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c
index 86b78b9..befcc43 100644
--- a/engines/default_engine/items.c
+++ b/engines/default_engine/items.c
@@ -315,7 +315,7 @@ int do_item_link(struct default_engine *engine, hash_item 
*it) {
     assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
     it->iflag |= ITEM_LINKED;
     it->time = engine->server.core->get_current_time();
-    assoc_insert(engine, it->hash, it);
+    assoc_insert(engine, it);
 
     pthread_mutex_lock(&engine->stats.lock);
     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
@@ -339,7 +339,7 @@ void do_item_unlink(struct default_engine *engine, 
hash_item *it) {
         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
         engine->stats.curr_items -= 1;
         pthread_mutex_unlock(&engine->stats.lock);
-        assoc_delete(engine, it->hash, item_get_key(it), it->nkey);
+        assoc_delete(engine, it);
         item_unlink_q(engine, it);
         if (it->refcount == 0) {
             item_free(engine, it);
-- 
1.7.1

Reply via email to