From: Ripduman Sohan <[email protected]>

Changed from one itemlist lock to a lock for every LRU list with the
aim of creating more opportunities for parallelism.

Signed-off-by: Ripduman Sohan <[email protected]>
---
 engines/default_engine/default_engine.c |    3 -
 engines/default_engine/items.c          |  123 ++++++++++++++++++++-----------
 engines/default_engine/items.h          |    3 +-
 3 files changed, 81 insertions(+), 48 deletions(-)

diff --git a/engines/default_engine/default_engine.c 
b/engines/default_engine/default_engine.c
index 0be4191..b7827d0 100644
--- a/engines/default_engine/default_engine.c
+++ b/engines/default_engine/default_engine.c
@@ -196,9 +196,6 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface,
       .slabs = {
          .lock = PTHREAD_MUTEX_INITIALIZER
       },
-      .items = {
-         .lock = PTHREAD_MUTEX_INITIALIZER,
-       },
       .cache_lock = PTHREAD_MUTEX_INITIALIZER,
       .stats = {
          .lock = PTHREAD_MUTEX_INITIALIZER,
diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c
index 2105931..3b623fa 100644
--- a/engines/default_engine/items.c
+++ b/engines/default_engine/items.c
@@ -31,6 +31,9 @@ static void item_free(struct default_engine *engine, 
hash_item *it);
 static void acquire_key_operation_mutex(struct default_engine *engine, const 
char *key, size_t nkey);
 static void release_key_operation_mutex(struct default_engine *engine, const 
char *key, size_t nkey);
 
+/* Macros to clarify the items datastructure LRU list acquisition and release  
*/
+#define lock_lru_list(engine, listid) 
pthread_mutex_lock(&engine->items.lock[listid])
+#define unlock_lru_list(engine, listid) 
pthread_mutex_unlock(&engine->items.lock[listid])
 /*
  * We only reposition items in the LRU queue if they haven't been repositioned
  * in this many seconds. That saves us from churning on frequently-accessed
@@ -44,9 +47,9 @@ static void release_key_operation_mutex(struct default_engine 
*engine, const cha
 static const int search_items = 50;
 
 void item_stats_reset(struct default_engine *engine) {
-    pthread_mutex_lock(&engine->items.lock);
+    pthread_mutex_lock(&engine->items.itemstats_lock);
     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
-    pthread_mutex_unlock(&engine->items.lock);
+    pthread_mutex_unlock(&engine->items.itemstats_lock);
 }
 
 
@@ -103,7 +106,7 @@ hash_item *do_item_alloc(struct default_engine *engine,
     rel_time_t oldest_live = engine->config.oldest_live;
     rel_time_t current_time = engine->server.core->get_current_time();
 
-    pthread_mutex_lock(&engine->items.lock);
+    lock_lru_list(engine, id);
     for (search = engine->items.tails[id];
          tries > 0 && search != NULL;
          tries--, search=search->prev) {
@@ -117,7 +120,11 @@ hash_item *do_item_alloc(struct default_engine *engine,
             pthread_mutex_lock(&engine->stats.lock);
             engine->stats.reclaimed++;
             pthread_mutex_unlock(&engine->stats.lock);
+
+            pthread_mutex_lock(&engine->items.itemstats_lock);
             engine->items.itemstats[id].reclaimed++;
+            pthread_mutex_unlock(&engine->items.itemstats_lock);
+
             it->refcount = 1;
             slabs_adjust_mem_requested(engine, it->slabs_clsid, 
ITEM_ntotal(engine, it), ntotal);
             do_item_unlink(engine, it);
@@ -127,7 +134,7 @@ hash_item *do_item_alloc(struct default_engine *engine,
             break;
         }
     }
-    pthread_mutex_unlock(&engine->items.lock);
+    unlock_lru_list(engine, id);
 
     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
         /*
@@ -141,9 +148,9 @@ hash_item *do_item_alloc(struct default_engine *engine,
          */
 
         if (engine->config.evict_to_free == 0) {
-            pthread_mutex_lock(&engine->items.lock);
+            pthread_mutex_lock(&engine->items.itemstats_lock);
             engine->items.itemstats[id].outofmemory++;
-            pthread_mutex_unlock(&engine->items.lock);
+            pthread_mutex_unlock(&engine->items.itemstats_lock);
             return NULL;
         }
 
@@ -155,21 +162,23 @@ hash_item *do_item_alloc(struct default_engine *engine,
          */
 
         if (engine->items.tails[id] == 0) {
-            pthread_mutex_lock(&engine->items.lock);
+            pthread_mutex_lock(&engine->items.itemstats_lock);
             engine->items.itemstats[id].outofmemory++;
-            pthread_mutex_lock(&engine->items.lock);
+            pthread_mutex_unlock(&engine->items.itemstats_lock);
             return NULL;
         }
 
-        pthread_mutex_lock(&engine->items.lock);
+        lock_lru_list(engine, id);
         for (search = engine->items.tails[id]; tries > 0 && search != NULL; 
tries--, search=search->prev) {
             if (search->refcount == 0) {
                 if (search->exptime == 0 || search->exptime > current_time) {
+                    pthread_mutex_lock(&engine->items.itemstats_lock);
                     engine->items.itemstats[id].evicted++;
                     engine->items.itemstats[id].evicted_time = current_time - 
search->time;
                     if (search->exptime != 0) {
                         engine->items.itemstats[id].evicted_nonzero++;
                     }
+                    pthread_mutex_unlock(&engine->items.itemstats_lock);
                     pthread_mutex_lock(&engine->stats.lock);
                     engine->stats.evictions++;
                     pthread_mutex_unlock(&engine->stats.lock);
@@ -177,7 +186,10 @@ hash_item *do_item_alloc(struct default_engine *engine,
                                                   item_get_key(search),
                                                   search->nkey);
                 } else {
+                    pthread_mutex_lock(&engine->items.itemstats_lock);
                     engine->items.itemstats[id].reclaimed++;
+                    pthread_mutex_unlock(&engine->items.itemstats_lock);
+
                     pthread_mutex_lock(&engine->stats.lock);
                     engine->stats.reclaimed++;
                     pthread_mutex_unlock(&engine->stats.lock);
@@ -186,12 +198,13 @@ hash_item *do_item_alloc(struct default_engine *engine,
                 break;
             }
         }
-        pthread_mutex_unlock(&engine->items.lock);
+        unlock_lru_list(engine, id);
 
         it = slabs_alloc(engine, ntotal, id);
         if (it == 0) {
-            pthread_mutex_lock(&engine->items.lock);
+            pthread_mutex_lock(&engine->items.itemstats_lock);
             engine->items.itemstats[id].outofmemory++;
+            pthread_mutex_unlock(&engine->items.itemstats_lock);
             /* Last ditch effort. There is a very rare bug which causes
              * refcount leaks. We've fixed most of them, but it still happens,
              * and it may happen in the future.
@@ -200,15 +213,18 @@ hash_item *do_item_alloc(struct default_engine *engine,
              * free it anyway.
              */
             tries = search_items;
+            lock_lru_list(engine, id);
             for (search = engine->items.tails[id]; tries > 0 && search != 
NULL; tries--, search=search->prev) {
                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < 
current_time) {
+                    pthread_mutex_lock(&engine->items.itemstats_lock);
                     engine->items.itemstats[id].tailrepairs++;
+                    pthread_mutex_unlock(&engine->items.itemstats_lock);
                     search->refcount = 0;
                     do_item_unlink(engine, search);
                     break;
                 }
             }
-            pthread_mutex_unlock(&engine->items.lock);
+            unlock_lru_list(engine, id);
             it = slabs_alloc(engine, ntotal, id);
             if (it == 0) {
                 return NULL;
@@ -350,13 +366,13 @@ void do_item_update(struct default_engine *engine, 
hash_item *it) {
     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
         assert((it->iflag & ITEM_SLABBED) == 0);
 
+        lock_lru_list(engine, it->slabs_clsid);
         if ((it->iflag & ITEM_LINKED) != 0) {
-            pthread_mutex_lock(&engine->items.lock);
             item_unlink_q(engine, it);
             it->time = current_time;
             item_link_q(engine, it);
-            pthread_mutex_unlock(&engine->items.lock);
         }
+        unlock_lru_list(engine, it->slabs_clsid);
     }
 }
 
@@ -366,10 +382,14 @@ int do_item_replace(struct default_engine *engine,
                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
     assert((it->iflag & ITEM_SLABBED) == 0);
 
-    pthread_mutex_lock(&engine->items.lock);
+    lock_lru_list(engine, it->slabs_clsid);
     do_item_unlink(engine, it);
+    unlock_lru_list(engine, it->slabs_clsid);
+
+    lock_lru_list(engine, new_it->slabs_clsid);
     int rc = do_item_link(engine, new_it);
-    pthread_mutex_unlock(&engine->items.lock);
+    unlock_lru_list(engine, new_it->slabs_clsid);
+
     return rc;
 }
 
@@ -428,6 +448,7 @@ static void do_item_stats(struct default_engine *engine,
     int i;
     rel_time_t current_time = engine->server.core->get_current_time();
     for (i = 0; i < POWER_LARGEST; i++) {
+        lock_lru_list(engine, i);
         if (engine->items.tails[i] != NULL) {
             int search = search_items;
             while (search > 0 &&
@@ -446,6 +467,7 @@ static void do_item_stats(struct default_engine *engine,
             }
             if (engine->items.tails[i] == NULL) {
                 /* We removed all of the items in this slab class */
+                unlock_lru_list(engine, i);
                 continue;
             }
 
@@ -467,6 +489,7 @@ static void do_item_stats(struct default_engine *engine,
             add_statistics(c, add_stats, prefix, i, "reclaimed",
                            "%u", engine->items.itemstats[i].reclaimed);;
         }
+        unlock_lru_list(engine, i);
     }
 }
 
@@ -484,6 +507,7 @@ static void do_item_stats_sizes(struct default_engine 
*engine,
 
         /* build the histogram */
         for (i = 0; i < POWER_LARGEST; i++) {
+            lock_lru_list(engine, i);
             hash_item *iter = engine->items.heads[i];
             while (iter) {
                 int ntotal = ITEM_ntotal(engine, iter);
@@ -492,6 +516,7 @@ static void do_item_stats_sizes(struct default_engine 
*engine,
                 if (bucket < num_buckets) histogram[bucket]++;
                 iter = iter->next;
             }
+            unlock_lru_list(engine, i);
         }
 
         /* write the buffer */
@@ -536,9 +561,10 @@ hash_item *do_item_get(struct default_engine *engine,
     if (it != NULL && engine->config.oldest_live != 0 &&
         engine->config.oldest_live <= current_time &&
         it->time <= engine->config.oldest_live) {
-        pthread_mutex_lock(&engine->items.lock);
+        int id = it->slabs_clsid;
+        lock_lru_list(engine, id);
         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
-        pthread_mutex_unlock(&engine->items.lock);
+        unlock_lru_list(engine, id);
         it = NULL;
     }
 
@@ -550,9 +576,10 @@ hash_item *do_item_get(struct default_engine *engine,
     }
 
     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
-        pthread_mutex_lock(&engine->items.lock);
+        int id = it->slabs_clsid;
+        lock_lru_list(engine, id);
         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
-        pthread_mutex_unlock(&engine->items.lock);
+        unlock_lru_list(engine, id);
         it = NULL;
     }
 
@@ -670,9 +697,9 @@ static ENGINE_ERROR_CODE do_store_item(struct 
default_engine *engine,
             if (old_it != NULL) {
                 do_item_replace(engine, old_it, it);
             } else {
-                pthread_mutex_lock(&engine->items.lock);
+                lock_lru_list(engine, it->slabs_clsid);
                 do_item_link(engine, it);
-                pthread_mutex_unlock(&engine->items.lock);
+                unlock_lru_list(engine, it->slabs_clsid);
             }
 
             *cas = item_get_cas(it);
@@ -755,9 +782,9 @@ static ENGINE_ERROR_CODE do_add_delta(struct default_engine 
*engine,
                                           it->exptime, res,
                                           cookie);
         if (new_it == NULL) {
-            pthread_mutex_lock(&engine->items.lock);
+            lock_lru_list(engine, it->slabs_clsid);
             do_item_unlink(engine, it);
-            pthread_mutex_unlock(&engine->items.lock);
+            unlock_lru_list(engine, it->slabs_clsid);
             return ENGINE_ENOMEM;
         }
         memcpy(item_get_data(new_it), buf, res);
@@ -812,9 +839,9 @@ void item_release(struct default_engine *engine, hash_item 
*item) {
  */
 void item_unlink(struct default_engine *engine, hash_item *item) {
     acquire_key_operation_mutex(engine, item_get_key(item), item->nkey);
-    pthread_mutex_lock(&engine->items.lock);
+    lock_lru_list(engine, item->slabs_clsid);
     do_item_unlink(engine, item);
-    pthread_mutex_unlock(&engine->items.lock);
+    unlock_lru_list(engine, item->slabs_clsid);
     release_key_operation_mutex(engine, item_get_key(item), item->nkey);
 }
 
@@ -930,7 +957,6 @@ void item_flush_expired(struct default_engine *engine, 
time_t when) {
     int i;
     hash_item *iter, *next;
 
-    pthread_mutex_lock(&engine->items.lock);
     if (when == 0) {
         engine->config.oldest_live = engine->server.core->get_current_time() - 
1;
     } else {
@@ -946,6 +972,7 @@ void item_flush_expired(struct default_engine *engine, 
time_t when) {
              * oldest_live time.
              * The oldest_live checking will auto-expire the remaining items.
              */
+            lock_lru_list(engine, i);
             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
                 if (iter->time >= engine->config.oldest_live) {
                     next = iter->next;
@@ -957,9 +984,9 @@ void item_flush_expired(struct default_engine *engine, 
time_t when) {
                     break;
                 }
             }
+            unlock_lru_list(engine, i);
         }
     }
-    pthread_mutex_unlock(&engine->items.lock);
 }
 
 /*
@@ -971,27 +998,23 @@ char *item_cachedump(struct default_engine *engine,
                      unsigned int *bytes) {
     char *ret;
 
-    pthread_mutex_lock(&engine->items.lock);
+    lock_lru_list(engine, slabs_clsid);
     ret = do_item_cachedump(slabs_clsid, limit, bytes);
-    pthread_mutex_unlock(&engine->items.lock);
+    unlock_lru_list(engine, slabs_clsid);
     return ret;
 }
 
 void item_stats(struct default_engine *engine,
                    ADD_STAT add_stat, const void *cookie)
 {
-    pthread_mutex_lock(&engine->items.lock);
     do_item_stats(engine, add_stat, cookie);
-    pthread_mutex_unlock(&engine->items.lock);
 }
 
 
 void item_stats_sizes(struct default_engine *engine,
                       ADD_STAT add_stat, const void *cookie)
 {
-    pthread_mutex_lock(&engine->items.lock);
     do_item_stats_sizes(engine, add_stat, cookie);
-    pthread_mutex_unlock(&engine->items.lock);
 }
 
 static void do_item_link_cursor(struct default_engine *engine,
@@ -1073,9 +1096,8 @@ static void item_scrub_class(struct default_engine 
*engine,
     ENGINE_ERROR_CODE ret;
     bool more;
     do {
-        pthread_mutex_lock(&engine->items.lock);
         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, 
&ret);
-        pthread_mutex_unlock(&engine->items.lock);
+
         if (ret != ENGINE_SUCCESS) {
             break;
         }
@@ -1088,7 +1110,7 @@ static void *item_scubber_main(void *arg)
     hash_item cursor = { .refcount = 1 };
 
     for (int ii = 0; ii < POWER_LARGEST; ++ii) {
-        pthread_mutex_lock(&engine->items.lock);
+        lock_lru_list(engine, ii);
         bool skip = false;
         if (engine->items.heads[ii] == NULL) {
             skip = true;
@@ -1096,11 +1118,11 @@ static void *item_scubber_main(void *arg)
             // add the item at the tail
             do_item_link_cursor(engine, &cursor, ii);
         }
-        pthread_mutex_unlock(&engine->items.lock);
 
         if (!skip) {
             item_scrub_class(engine, &cursor);
         }
+        unlock_lru_list(engine, ii);
     }
 
     pthread_mutex_lock(&engine->scrubber.lock);
@@ -1173,16 +1195,19 @@ static tap_event_t do_item_tap_walker(struct 
default_engine *engine,
     client->it = NULL;
 
     ENGINE_ERROR_CODE r;
+    lock_lru_list(engine, client->cursor.slabs_clsid);
     do {
         if (!do_item_walk_cursor(engine, &client->cursor, 1, 
item_tap_iterfunc, client, &r)) {
             // find next slab class to look at..
             bool linked = false;
             for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST 
&& !linked;  ++ii) {
+                lock_lru_list(engine, ii);
                 if (engine->items.heads[ii] != NULL) {
                     // add the item at the tail
                     do_item_link_cursor(engine, &client->cursor, ii);
                     linked = true;
                 }
+                unlock_lru_list(engine, ii);
             }
             if (!linked) {
                 break;
@@ -1190,6 +1215,7 @@ static tap_event_t do_item_tap_walker(struct 
default_engine *engine,
         }
     } while (client->it == NULL);
     *itm = client->it;
+    unlock_lru_list(engine, client->cursor.slabs_clsid);
 
     return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
 }
@@ -1202,9 +1228,7 @@ tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
 {
     tap_event_t ret;
     struct default_engine *engine = (struct default_engine*)handle;
-    pthread_mutex_lock(&engine->items.lock);
     ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, 
vbucket);
-    pthread_mutex_unlock(&engine->items.lock);
 
     return ret;
 }
@@ -1221,13 +1245,13 @@ bool initialize_item_tap_walker(struct default_engine 
*engine,
     /* Link the cursor! */
     bool linked = false;
     for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
-        pthread_mutex_lock(&engine->items.lock);
+        lock_lru_list(engine, ii);
         if (engine->items.heads[ii] != NULL) {
             // add the item at the tail
             do_item_link_cursor(engine, &client->cursor, ii);
             linked = true;
         }
-        pthread_mutex_unlock(&engine->items.lock);
+        unlock_lru_list(engine, ii);
     }
 
     engine->server.cookie->store_engine_specific(cookie, client);
@@ -1236,8 +1260,18 @@ bool initialize_item_tap_walker(struct default_engine 
*engine,
 
 ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t 
num_threads)
 {
-    pthread_mutex_init(&engine->cur_op_recs.lock, NULL);
-    pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL);
+    if (pthread_mutex_init(&engine->cur_op_recs.lock, NULL))
+        return ENGINE_FAILED;
+
+    if (pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL))
+        return ENGINE_FAILED;
+
+    for (int i = 0; i < POWER_LARGEST; i++)
+        if (pthread_mutex_init(engine->items.lock + i, NULL))
+            return ENGINE_FAILED;
+
+    if (pthread_mutex_init(&engine->items.itemstats_lock, NULL))
+        return ENGINE_FAILED;
 
     engine->cur_op_recs.keys = calloc(num_threads, 
sizeof(*engine->cur_op_recs.keys));
     if (engine->cur_op_recs.keys == NULL)
@@ -1306,6 +1340,7 @@ void items_destroy(struct default_engine *engine)
 {
     pthread_mutex_destroy(&engine->cur_op_recs.lock);
     pthread_cond_destroy(&engine->cur_op_recs.key_wait_cv);
+
     for (int i = 0; i < POWER_LARGEST; i++)
         pthread_mutex_destroy(engine->items.lock + i);
 
diff --git a/engines/default_engine/items.h b/engines/default_engine/items.h
index 62aa1f9..6b02624 100644
--- a/engines/default_engine/items.h
+++ b/engines/default_engine/items.h
@@ -38,7 +38,8 @@ struct items {
    hash_item *tails[POWER_LARGEST];
    itemstats_t itemstats[POWER_LARGEST];
    unsigned int sizes[POWER_LARGEST];
-   pthread_mutex_t lock;
+   pthread_mutex_t lock[POWER_LARGEST];
+   pthread_mutex_t itemstats_lock;
 };
 
 struct cur_key_rec {
-- 
1.7.1

Reply via email to