From: Ripduman Sohan <[email protected]> This improves the "item mutex" datastructure introduced in af3fcca940. To improve scalability, we now use an array of locks and the lower N bits to mask off and index the relevant lock. Thus, threads with the lower N bits being identical will be serialised. This can be improved by either increasing N (at O(N^2) space cost) or expanding this to its logical conclusion -- a hashtable such that a comparison is done on the whole key.
However, ultimately the proper solution lies in allowing multiple threads to access and manipulate a given item concurrently as described in 2bbc817f Signed-off-by: Ripduman Sohan <[email protected]> --- engines/default_engine/default_engine.c | 2 +- engines/default_engine/default_engine.h | 2 +- engines/default_engine/items.c | 93 ++++++++++++++----------------- engines/default_engine/items.h | 16 +++-- 4 files changed, 53 insertions(+), 60 deletions(-) diff --git a/engines/default_engine/default_engine.c b/engines/default_engine/default_engine.c index b6e9c68..3fd282d 100644 --- a/engines/default_engine/default_engine.c +++ b/engines/default_engine/default_engine.c @@ -275,7 +275,7 @@ static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle, return ret; } - ret = items_init(se, se->config.num_threads); + ret = items_init(se); if (ret != ENGINE_SUCCESS) { return ret; } diff --git a/engines/default_engine/default_engine.h b/engines/default_engine/default_engine.h index 8209038..7541a1d 100644 --- a/engines/default_engine/default_engine.h +++ b/engines/default_engine/default_engine.h @@ -129,7 +129,7 @@ struct default_engine { /* Used to arbitrate concurrent access/modification of items by * multiple threads */ - struct cur_op_recs cur_op_recs; + struct cur_op_recs_htable cur_op_recs_htable; struct config config; struct engine_stats stats; diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c index 0aacf4a..66af2a7 100644 --- a/engines/default_engine/items.c +++ b/engines/default_engine/items.c @@ -46,6 +46,9 @@ static void release_key_operation_mutex(struct default_engine *engine, uint32_t */ static const int search_items = 50; +/* Number of bits to use in mask for key operation mutex */ +#define KEY_OP_MUTEX_MASK_BITS 10 + void item_stats_reset(struct default_engine *engine) { mutex_lock(&engine->items.itemstats_lock); memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats)); @@ -1258,94 +1261,82 @@ bool initialize_item_tap_walker(struct default_engine *engine, return true; } -ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t num_threads) +ENGINE_ERROR_CODE items_init(struct default_engine *engine) { - if (pthread_mutex_init(&engine->cur_op_recs.lock, NULL)) - return ENGINE_FAILED; - - if (pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL)) - return ENGINE_FAILED; + uint32_t nr_buckets = (1U << KEY_OP_MUTEX_MASK_BITS); + uint32_t i; - for (int i = 0; i < POWER_LARGEST; i++) + for (i = 0; i < POWER_LARGEST; i++) if (pthread_mutex_init(engine->items.lock + i, NULL)) return ENGINE_FAILED; if (pthread_mutex_init(&engine->items.itemstats_lock, NULL)) return ENGINE_FAILED; - engine->cur_op_recs.keys = calloc(num_threads, sizeof(*engine->cur_op_recs.keys)); - if (engine->cur_op_recs.keys == NULL) + engine->cur_op_recs_htable.buckets = calloc(nr_buckets, + sizeof(*engine->cur_op_recs_htable.buckets)); + + if (engine->cur_op_recs_htable.buckets == NULL) return ENGINE_ENOMEM; - engine->cur_op_recs.num_recs = num_threads; + engine->cur_op_recs_htable.nr_buckets = nr_buckets; + + for (i = 0; i < nr_buckets; i++) + { + if (pthread_mutex_init(&engine->cur_op_recs_htable.buckets[i].lock, NULL)) + return ENGINE_FAILED; - for (int i = 0; i < engine->cur_op_recs.num_recs; i++) - engine->cur_op_recs.keys[i].hash = -1; + if (pthread_cond_init(&engine->cur_op_recs_htable.buckets[i].wait_cv, NULL)) + return ENGINE_FAILED; + } return ENGINE_SUCCESS; } static void acquire_key_operation_mutex(struct default_engine *engine, uint32_t hash) { - int i; - int zero_index; - struct cur_op_recs *cur_op_recs = &engine->cur_op_recs; + uint32_t bucket_nr = hash & (engine->cur_op_recs_htable.nr_buckets - 1); + struct cur_op_rec_bucket *bucket = engine->cur_op_recs_htable.buckets + bucket_nr; - mutex_lock(&cur_op_recs->lock); + mutex_lock(&bucket->lock); start: - zero_index = cur_op_recs->num_recs; - - for (i = 0; i < cur_op_recs->num_recs; i++) - { - if (cur_op_recs->keys[i].hash == -1) - zero_index = i; - - if (cur_op_recs->keys[i].hash == hash) { - pthread_cond_wait(&cur_op_recs->key_wait_cv, &cur_op_recs->lock); - goto start; - } - + if (bucket->in_use) { + pthread_cond_wait(&bucket->wait_cv, &bucket->lock); + goto start; } - assert(zero_index != cur_op_recs->num_recs); + bucket->in_use = true; - cur_op_recs->keys[zero_index].hash = hash; - mutex_unlock(&cur_op_recs->lock); + mutex_unlock(&bucket->lock); } static void release_key_operation_mutex(struct default_engine *engine, uint32_t hash) { - int i; - struct cur_op_recs *cur_op_recs = &engine->cur_op_recs; + uint32_t bucket_nr = hash & (engine->cur_op_recs_htable.nr_buckets - 1); + struct cur_op_rec_bucket *bucket = engine->cur_op_recs_htable.buckets + bucket_nr; - mutex_lock(&cur_op_recs->lock); - - for (i = 0; i < cur_op_recs->num_recs; i++) - { - if (cur_op_recs->keys[i].hash == hash) - { - cur_op_recs->keys[i].hash = -1; - break; - } - } + mutex_lock(&bucket->lock); - assert(i != cur_op_recs->num_recs); + bucket->in_use = false; - pthread_cond_broadcast(&cur_op_recs->key_wait_cv); - mutex_unlock(&cur_op_recs->lock); + pthread_cond_signal(&bucket->wait_cv); + mutex_unlock(&bucket->lock); } void items_destroy(struct default_engine *engine) { - pthread_mutex_destroy(&engine->cur_op_recs.lock); - pthread_cond_destroy(&engine->cur_op_recs.key_wait_cv); - for (int i = 0; i < POWER_LARGEST; i++) pthread_mutex_destroy(engine->items.lock + i); pthread_mutex_destroy(&engine->items.itemstats_lock); - free(engine->cur_op_recs.keys); - engine->cur_op_recs.keys = NULL; + for (uint32_t i = 0; i < engine->cur_op_recs_htable.nr_buckets; i++) + { + pthread_mutex_destroy(&engine->cur_op_recs_htable.buckets[i].lock); + pthread_cond_destroy(&engine->cur_op_recs_htable.buckets[i].wait_cv); + } + + free(engine->cur_op_recs_htable.buckets); + engine->cur_op_recs_htable.buckets = NULL; } diff --git a/engines/default_engine/items.h b/engines/default_engine/items.h index 4ed6ea9..0c50f17 100644 --- a/engines/default_engine/items.h +++ b/engines/default_engine/items.h @@ -43,13 +43,15 @@ struct items { pthread_mutex_t itemstats_lock; }; -struct cur_op_recs { +struct cur_op_rec_bucket { pthread_mutex_t lock; - pthread_cond_t key_wait_cv; - int num_recs; - struct { - int64_t hash; - } *keys; /* Dynamically allocated at runtime */ + pthread_cond_t wait_cv; + bool in_use; +}; + +struct cur_op_recs_htable { + uint32_t nr_buckets; + struct cur_op_rec_bucket *buckets; /* Dynamically allocated at runtime */ }; /** @@ -201,7 +203,7 @@ tap_event_t item_tap_walker(ENGINE_HANDLE* handle, bool initialize_item_tap_walker(struct default_engine *engine, const void* cookie); -ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t num_threads); +ENGINE_ERROR_CODE items_init(struct default_engine *engine); void items_destroy(struct default_engine *engine); -- 1.7.1
