From: Ripduman Sohan <[email protected]>

This improves the "item mutex" datastructure introduced in af3fcca940.
To improve scalability, we now use an array of locks and the lower N
bits to mask off and index the relevant lock.  Thus, threads with the
lower N bits being identical will be serialised.  This can be improved
by either increasing N (at O(N^2) space cost) or expanding this to its
logical conclusion -- a hashtable such that a comparison is done on
the whole key.

However, ultimately the proper solution lies in allowing multiple
threads to access and manipulate a given item concurrently as
described in 2bbc817f

Signed-off-by: Ripduman Sohan <[email protected]>
---
 engines/default_engine/default_engine.c |    2 +-
 engines/default_engine/default_engine.h |    2 +-
 engines/default_engine/items.c          |   93 ++++++++++++++-----------------
 engines/default_engine/items.h          |   16 +++--
 4 files changed, 53 insertions(+), 60 deletions(-)

diff --git a/engines/default_engine/default_engine.c 
b/engines/default_engine/default_engine.c
index b6e9c68..3fd282d 100644
--- a/engines/default_engine/default_engine.c
+++ b/engines/default_engine/default_engine.c
@@ -275,7 +275,7 @@ static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* 
handle,
       return ret;
    }
 
-   ret = items_init(se, se->config.num_threads);
+   ret = items_init(se);
    if (ret != ENGINE_SUCCESS) {
        return ret;
    }
diff --git a/engines/default_engine/default_engine.h 
b/engines/default_engine/default_engine.h
index 8209038..7541a1d 100644
--- a/engines/default_engine/default_engine.h
+++ b/engines/default_engine/default_engine.h
@@ -129,7 +129,7 @@ struct default_engine {
 
    /* Used to arbitrate concurrent access/modification of items by
     * multiple threads */
-   struct cur_op_recs cur_op_recs;
+   struct cur_op_recs_htable cur_op_recs_htable;
 
    struct config config;
    struct engine_stats stats;
diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c
index 0aacf4a..66af2a7 100644
--- a/engines/default_engine/items.c
+++ b/engines/default_engine/items.c
@@ -46,6 +46,9 @@ static void release_key_operation_mutex(struct default_engine 
*engine, uint32_t
  */
 static const int search_items = 50;
 
+/* Number of bits to use in mask for key operation mutex */
+#define KEY_OP_MUTEX_MASK_BITS 10
+
 void item_stats_reset(struct default_engine *engine) {
     mutex_lock(&engine->items.itemstats_lock);
     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
@@ -1258,94 +1261,82 @@ bool initialize_item_tap_walker(struct default_engine 
*engine,
     return true;
 }
 
-ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t 
num_threads)
+ENGINE_ERROR_CODE items_init(struct default_engine *engine)
 {
-    if (pthread_mutex_init(&engine->cur_op_recs.lock, NULL))
-        return ENGINE_FAILED;
-
-    if (pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL))
-        return ENGINE_FAILED;
+    uint32_t nr_buckets = (1U << KEY_OP_MUTEX_MASK_BITS);
+    uint32_t i;
 
-    for (int i = 0; i < POWER_LARGEST; i++)
+    for (i = 0; i < POWER_LARGEST; i++)
         if (pthread_mutex_init(engine->items.lock + i, NULL))
             return ENGINE_FAILED;
 
     if (pthread_mutex_init(&engine->items.itemstats_lock, NULL))
         return ENGINE_FAILED;
 
-    engine->cur_op_recs.keys = calloc(num_threads, 
sizeof(*engine->cur_op_recs.keys));
-    if (engine->cur_op_recs.keys == NULL)
+    engine->cur_op_recs_htable.buckets = calloc(nr_buckets,
+                                                 
sizeof(*engine->cur_op_recs_htable.buckets));
+
+    if (engine->cur_op_recs_htable.buckets == NULL)
         return ENGINE_ENOMEM;
 
-    engine->cur_op_recs.num_recs = num_threads;
+    engine->cur_op_recs_htable.nr_buckets = nr_buckets;
+
+    for (i = 0; i < nr_buckets; i++)
+    {
+        if (pthread_mutex_init(&engine->cur_op_recs_htable.buckets[i].lock, 
NULL))
+            return ENGINE_FAILED;
 
-    for (int i = 0; i < engine->cur_op_recs.num_recs; i++)
-        engine->cur_op_recs.keys[i].hash = -1;
+        if (pthread_cond_init(&engine->cur_op_recs_htable.buckets[i].wait_cv, 
NULL))
+            return ENGINE_FAILED;
+    }
 
     return ENGINE_SUCCESS;
 }
 
 static void acquire_key_operation_mutex(struct default_engine *engine, 
uint32_t hash)
 {
-    int i;
-    int zero_index;
-    struct cur_op_recs *cur_op_recs = &engine->cur_op_recs;
+    uint32_t bucket_nr = hash & (engine->cur_op_recs_htable.nr_buckets - 1);
+    struct cur_op_rec_bucket *bucket = engine->cur_op_recs_htable.buckets + 
bucket_nr;
 
-    mutex_lock(&cur_op_recs->lock);
+    mutex_lock(&bucket->lock);
 
 start:
-    zero_index = cur_op_recs->num_recs;
-
-    for (i = 0; i < cur_op_recs->num_recs; i++)
-    {
-        if (cur_op_recs->keys[i].hash == -1)
-            zero_index = i;
-
-        if (cur_op_recs->keys[i].hash == hash) {
-            pthread_cond_wait(&cur_op_recs->key_wait_cv, &cur_op_recs->lock);
-            goto start;
-        }
-
+    if (bucket->in_use) {
+        pthread_cond_wait(&bucket->wait_cv, &bucket->lock);
+        goto start;
     }
 
-    assert(zero_index != cur_op_recs->num_recs);
+    bucket->in_use = true;
 
-    cur_op_recs->keys[zero_index].hash = hash;
-    mutex_unlock(&cur_op_recs->lock);
+    mutex_unlock(&bucket->lock);
 }
 
 static void release_key_operation_mutex(struct default_engine *engine, 
uint32_t hash)
 {
-    int i;
-    struct cur_op_recs *cur_op_recs = &engine->cur_op_recs;
+    uint32_t bucket_nr = hash & (engine->cur_op_recs_htable.nr_buckets - 1);
+    struct cur_op_rec_bucket *bucket = engine->cur_op_recs_htable.buckets + 
bucket_nr;
 
-    mutex_lock(&cur_op_recs->lock);
-
-    for (i = 0; i < cur_op_recs->num_recs; i++)
-    {
-        if (cur_op_recs->keys[i].hash == hash)
-        {
-            cur_op_recs->keys[i].hash = -1;
-            break;
-        }
-    }
+    mutex_lock(&bucket->lock);
 
-    assert(i != cur_op_recs->num_recs);
+    bucket->in_use = false;
 
-    pthread_cond_broadcast(&cur_op_recs->key_wait_cv);
-    mutex_unlock(&cur_op_recs->lock);
+    pthread_cond_signal(&bucket->wait_cv);
+    mutex_unlock(&bucket->lock);
 }
 
 void items_destroy(struct default_engine *engine)
 {
-    pthread_mutex_destroy(&engine->cur_op_recs.lock);
-    pthread_cond_destroy(&engine->cur_op_recs.key_wait_cv);
-
     for (int i = 0; i < POWER_LARGEST; i++)
         pthread_mutex_destroy(engine->items.lock + i);
 
     pthread_mutex_destroy(&engine->items.itemstats_lock);
 
-    free(engine->cur_op_recs.keys);
-    engine->cur_op_recs.keys = NULL;
+    for (uint32_t i = 0; i < engine->cur_op_recs_htable.nr_buckets; i++)
+    {
+        pthread_mutex_destroy(&engine->cur_op_recs_htable.buckets[i].lock);
+        pthread_cond_destroy(&engine->cur_op_recs_htable.buckets[i].wait_cv);
+    }
+
+    free(engine->cur_op_recs_htable.buckets);
+    engine->cur_op_recs_htable.buckets = NULL;
 }
diff --git a/engines/default_engine/items.h b/engines/default_engine/items.h
index 4ed6ea9..0c50f17 100644
--- a/engines/default_engine/items.h
+++ b/engines/default_engine/items.h
@@ -43,13 +43,15 @@ struct items {
    pthread_mutex_t itemstats_lock;
 };
 
-struct cur_op_recs {
+struct cur_op_rec_bucket {
     pthread_mutex_t lock;
-    pthread_cond_t  key_wait_cv;
-    int num_recs;
-    struct {
-        int64_t hash;
-    } *keys; /* Dynamically allocated at runtime */
+    pthread_cond_t wait_cv;
+    bool in_use;
+};
+
+struct cur_op_recs_htable {
+    uint32_t nr_buckets;
+    struct cur_op_rec_bucket *buckets; /* Dynamically allocated at runtime */
 };
 
 /**
@@ -201,7 +203,7 @@ tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
 bool initialize_item_tap_walker(struct default_engine *engine,
                                 const void* cookie);
 
-ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t 
num_threads);
+ENGINE_ERROR_CODE items_init(struct default_engine *engine);
 
 void items_destroy(struct default_engine *engine);
 
-- 
1.7.1

Reply via email to