From: Ripduman Sohan <[email protected]> Changed from one itemlist lock to a lock for every LRU list with the aim of creating more opportunities for parallelism.
Signed-off-by: Ripduman Sohan <[email protected]> --- engines/default_engine/default_engine.c | 3 - engines/default_engine/items.c | 123 ++++++++++++++++++++----------- engines/default_engine/items.h | 3 +- 3 files changed, 81 insertions(+), 48 deletions(-) diff --git a/engines/default_engine/default_engine.c b/engines/default_engine/default_engine.c index 0be4191..b7827d0 100644 --- a/engines/default_engine/default_engine.c +++ b/engines/default_engine/default_engine.c @@ -196,9 +196,6 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface, .slabs = { .lock = PTHREAD_MUTEX_INITIALIZER }, - .items = { - .lock = PTHREAD_MUTEX_INITIALIZER, - }, .cache_lock = PTHREAD_MUTEX_INITIALIZER, .stats = { .lock = PTHREAD_MUTEX_INITIALIZER, diff --git a/engines/default_engine/items.c b/engines/default_engine/items.c index 2105931..3b623fa 100644 --- a/engines/default_engine/items.c +++ b/engines/default_engine/items.c @@ -31,6 +31,9 @@ static void item_free(struct default_engine *engine, hash_item *it); static void acquire_key_operation_mutex(struct default_engine *engine, const char *key, size_t nkey); static void release_key_operation_mutex(struct default_engine *engine, const char *key, size_t nkey); +/* Macros to clarify the items datastructure LRU list acquisition and release */ +#define lock_lru_list(engine, listid) pthread_mutex_lock(&engine->items.lock[listid]) +#define unlock_lru_list(engine, listid) pthread_mutex_unlock(&engine->items.lock[listid]) /* * We only reposition items in the LRU queue if they haven't been repositioned * in this many seconds. That saves us from churning on frequently-accessed @@ -44,9 +47,9 @@ static void release_key_operation_mutex(struct default_engine *engine, const cha static const int search_items = 50; void item_stats_reset(struct default_engine *engine) { - pthread_mutex_lock(&engine->items.lock); + pthread_mutex_lock(&engine->items.itemstats_lock); memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats)); - pthread_mutex_unlock(&engine->items.lock); + pthread_mutex_unlock(&engine->items.itemstats_lock); } @@ -103,7 +106,7 @@ hash_item *do_item_alloc(struct default_engine *engine, rel_time_t oldest_live = engine->config.oldest_live; rel_time_t current_time = engine->server.core->get_current_time(); - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, id); for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) { @@ -117,7 +120,11 @@ hash_item *do_item_alloc(struct default_engine *engine, pthread_mutex_lock(&engine->stats.lock); engine->stats.reclaimed++; pthread_mutex_unlock(&engine->stats.lock); + + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].reclaimed++; + pthread_mutex_unlock(&engine->items.itemstats_lock); + it->refcount = 1; slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal); do_item_unlink(engine, it); @@ -127,7 +134,7 @@ hash_item *do_item_alloc(struct default_engine *engine, break; } } - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, id); if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) { /* @@ -141,9 +148,9 @@ hash_item *do_item_alloc(struct default_engine *engine, */ if (engine->config.evict_to_free == 0) { - pthread_mutex_lock(&engine->items.lock); + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].outofmemory++; - pthread_mutex_unlock(&engine->items.lock); + pthread_mutex_unlock(&engine->items.itemstats_lock); return NULL; } @@ -155,21 +162,23 @@ hash_item *do_item_alloc(struct default_engine *engine, */ if (engine->items.tails[id] == 0) { - pthread_mutex_lock(&engine->items.lock); + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].outofmemory++; - pthread_mutex_lock(&engine->items.lock); + pthread_mutex_unlock(&engine->items.itemstats_lock); return NULL; } - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, id); for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) { if (search->refcount == 0) { if (search->exptime == 0 || search->exptime > current_time) { + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].evicted++; engine->items.itemstats[id].evicted_time = current_time - search->time; if (search->exptime != 0) { engine->items.itemstats[id].evicted_nonzero++; } + pthread_mutex_unlock(&engine->items.itemstats_lock); pthread_mutex_lock(&engine->stats.lock); engine->stats.evictions++; pthread_mutex_unlock(&engine->stats.lock); @@ -177,7 +186,10 @@ hash_item *do_item_alloc(struct default_engine *engine, item_get_key(search), search->nkey); } else { + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].reclaimed++; + pthread_mutex_unlock(&engine->items.itemstats_lock); + pthread_mutex_lock(&engine->stats.lock); engine->stats.reclaimed++; pthread_mutex_unlock(&engine->stats.lock); @@ -186,12 +198,13 @@ hash_item *do_item_alloc(struct default_engine *engine, break; } } - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, id); it = slabs_alloc(engine, ntotal, id); if (it == 0) { - pthread_mutex_lock(&engine->items.lock); + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].outofmemory++; + pthread_mutex_unlock(&engine->items.itemstats_lock); /* Last ditch effort. There is a very rare bug which causes * refcount leaks. We've fixed most of them, but it still happens, * and it may happen in the future. @@ -200,15 +213,18 @@ hash_item *do_item_alloc(struct default_engine *engine, * free it anyway. */ tries = search_items; + lock_lru_list(engine, id); for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) { if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) { + pthread_mutex_lock(&engine->items.itemstats_lock); engine->items.itemstats[id].tailrepairs++; + pthread_mutex_unlock(&engine->items.itemstats_lock); search->refcount = 0; do_item_unlink(engine, search); break; } } - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, id); it = slabs_alloc(engine, ntotal, id); if (it == 0) { return NULL; @@ -350,13 +366,13 @@ void do_item_update(struct default_engine *engine, hash_item *it) { if (it->time < current_time - ITEM_UPDATE_INTERVAL) { assert((it->iflag & ITEM_SLABBED) == 0); + lock_lru_list(engine, it->slabs_clsid); if ((it->iflag & ITEM_LINKED) != 0) { - pthread_mutex_lock(&engine->items.lock); item_unlink_q(engine, it); it->time = current_time; item_link_q(engine, it); - pthread_mutex_unlock(&engine->items.lock); } + unlock_lru_list(engine, it->slabs_clsid); } } @@ -366,10 +382,14 @@ int do_item_replace(struct default_engine *engine, item_get_key(new_it), new_it->nkey, new_it->nbytes); assert((it->iflag & ITEM_SLABBED) == 0); - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, it->slabs_clsid); do_item_unlink(engine, it); + unlock_lru_list(engine, it->slabs_clsid); + + lock_lru_list(engine, new_it->slabs_clsid); int rc = do_item_link(engine, new_it); - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, new_it->slabs_clsid); + return rc; } @@ -428,6 +448,7 @@ static void do_item_stats(struct default_engine *engine, int i; rel_time_t current_time = engine->server.core->get_current_time(); for (i = 0; i < POWER_LARGEST; i++) { + lock_lru_list(engine, i); if (engine->items.tails[i] != NULL) { int search = search_items; while (search > 0 && @@ -446,6 +467,7 @@ static void do_item_stats(struct default_engine *engine, } if (engine->items.tails[i] == NULL) { /* We removed all of the items in this slab class */ + unlock_lru_list(engine, i); continue; } @@ -467,6 +489,7 @@ static void do_item_stats(struct default_engine *engine, add_statistics(c, add_stats, prefix, i, "reclaimed", "%u", engine->items.itemstats[i].reclaimed);; } + unlock_lru_list(engine, i); } } @@ -484,6 +507,7 @@ static void do_item_stats_sizes(struct default_engine *engine, /* build the histogram */ for (i = 0; i < POWER_LARGEST; i++) { + lock_lru_list(engine, i); hash_item *iter = engine->items.heads[i]; while (iter) { int ntotal = ITEM_ntotal(engine, iter); @@ -492,6 +516,7 @@ static void do_item_stats_sizes(struct default_engine *engine, if (bucket < num_buckets) histogram[bucket]++; iter = iter->next; } + unlock_lru_list(engine, i); } /* write the buffer */ @@ -536,9 +561,10 @@ hash_item *do_item_get(struct default_engine *engine, if (it != NULL && engine->config.oldest_live != 0 && engine->config.oldest_live <= current_time && it->time <= engine->config.oldest_live) { - pthread_mutex_lock(&engine->items.lock); + int id = it->slabs_clsid; + lock_lru_list(engine, id); do_item_unlink(engine, it); /* MTSAFE - cache_lock held */ - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, id); it = NULL; } @@ -550,9 +576,10 @@ hash_item *do_item_get(struct default_engine *engine, } if (it != NULL && it->exptime != 0 && it->exptime <= current_time) { - pthread_mutex_lock(&engine->items.lock); + int id = it->slabs_clsid; + lock_lru_list(engine, id); do_item_unlink(engine, it); /* MTSAFE - cache_lock held */ - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, id); it = NULL; } @@ -670,9 +697,9 @@ static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine, if (old_it != NULL) { do_item_replace(engine, old_it, it); } else { - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, it->slabs_clsid); do_item_link(engine, it); - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, it->slabs_clsid); } *cas = item_get_cas(it); @@ -755,9 +782,9 @@ static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine, it->exptime, res, cookie); if (new_it == NULL) { - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, it->slabs_clsid); do_item_unlink(engine, it); - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, it->slabs_clsid); return ENGINE_ENOMEM; } memcpy(item_get_data(new_it), buf, res); @@ -812,9 +839,9 @@ void item_release(struct default_engine *engine, hash_item *item) { */ void item_unlink(struct default_engine *engine, hash_item *item) { acquire_key_operation_mutex(engine, item_get_key(item), item->nkey); - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, item->slabs_clsid); do_item_unlink(engine, item); - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, item->slabs_clsid); release_key_operation_mutex(engine, item_get_key(item), item->nkey); } @@ -930,7 +957,6 @@ void item_flush_expired(struct default_engine *engine, time_t when) { int i; hash_item *iter, *next; - pthread_mutex_lock(&engine->items.lock); if (when == 0) { engine->config.oldest_live = engine->server.core->get_current_time() - 1; } else { @@ -946,6 +972,7 @@ void item_flush_expired(struct default_engine *engine, time_t when) { * oldest_live time. * The oldest_live checking will auto-expire the remaining items. */ + lock_lru_list(engine, i); for (iter = engine->items.heads[i]; iter != NULL; iter = next) { if (iter->time >= engine->config.oldest_live) { next = iter->next; @@ -957,9 +984,9 @@ void item_flush_expired(struct default_engine *engine, time_t when) { break; } } + unlock_lru_list(engine, i); } } - pthread_mutex_unlock(&engine->items.lock); } /* @@ -971,27 +998,23 @@ char *item_cachedump(struct default_engine *engine, unsigned int *bytes) { char *ret; - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, slabs_clsid); ret = do_item_cachedump(slabs_clsid, limit, bytes); - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, slabs_clsid); return ret; } void item_stats(struct default_engine *engine, ADD_STAT add_stat, const void *cookie) { - pthread_mutex_lock(&engine->items.lock); do_item_stats(engine, add_stat, cookie); - pthread_mutex_unlock(&engine->items.lock); } void item_stats_sizes(struct default_engine *engine, ADD_STAT add_stat, const void *cookie) { - pthread_mutex_lock(&engine->items.lock); do_item_stats_sizes(engine, add_stat, cookie); - pthread_mutex_unlock(&engine->items.lock); } static void do_item_link_cursor(struct default_engine *engine, @@ -1073,9 +1096,8 @@ static void item_scrub_class(struct default_engine *engine, ENGINE_ERROR_CODE ret; bool more; do { - pthread_mutex_lock(&engine->items.lock); more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret); - pthread_mutex_unlock(&engine->items.lock); + if (ret != ENGINE_SUCCESS) { break; } @@ -1088,7 +1110,7 @@ static void *item_scubber_main(void *arg) hash_item cursor = { .refcount = 1 }; for (int ii = 0; ii < POWER_LARGEST; ++ii) { - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, ii); bool skip = false; if (engine->items.heads[ii] == NULL) { skip = true; @@ -1096,11 +1118,11 @@ static void *item_scubber_main(void *arg) // add the item at the tail do_item_link_cursor(engine, &cursor, ii); } - pthread_mutex_unlock(&engine->items.lock); if (!skip) { item_scrub_class(engine, &cursor); } + unlock_lru_list(engine, ii); } pthread_mutex_lock(&engine->scrubber.lock); @@ -1173,16 +1195,19 @@ static tap_event_t do_item_tap_walker(struct default_engine *engine, client->it = NULL; ENGINE_ERROR_CODE r; + lock_lru_list(engine, client->cursor.slabs_clsid); do { if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) { // find next slab class to look at.. bool linked = false; for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) { + lock_lru_list(engine, ii); if (engine->items.heads[ii] != NULL) { // add the item at the tail do_item_link_cursor(engine, &client->cursor, ii); linked = true; } + unlock_lru_list(engine, ii); } if (!linked) { break; @@ -1190,6 +1215,7 @@ static tap_event_t do_item_tap_walker(struct default_engine *engine, } } while (client->it == NULL); *itm = client->it; + unlock_lru_list(engine, client->cursor.slabs_clsid); return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION; } @@ -1202,9 +1228,7 @@ tap_event_t item_tap_walker(ENGINE_HANDLE* handle, { tap_event_t ret; struct default_engine *engine = (struct default_engine*)handle; - pthread_mutex_lock(&engine->items.lock); ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket); - pthread_mutex_unlock(&engine->items.lock); return ret; } @@ -1221,13 +1245,13 @@ bool initialize_item_tap_walker(struct default_engine *engine, /* Link the cursor! */ bool linked = false; for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) { - pthread_mutex_lock(&engine->items.lock); + lock_lru_list(engine, ii); if (engine->items.heads[ii] != NULL) { // add the item at the tail do_item_link_cursor(engine, &client->cursor, ii); linked = true; } - pthread_mutex_unlock(&engine->items.lock); + unlock_lru_list(engine, ii); } engine->server.cookie->store_engine_specific(cookie, client); @@ -1236,8 +1260,18 @@ bool initialize_item_tap_walker(struct default_engine *engine, ENGINE_ERROR_CODE items_init(struct default_engine *engine, uint32_t num_threads) { - pthread_mutex_init(&engine->cur_op_recs.lock, NULL); - pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL); + if (pthread_mutex_init(&engine->cur_op_recs.lock, NULL)) + return ENGINE_FAILED; + + if (pthread_cond_init(&engine->cur_op_recs.key_wait_cv, NULL)) + return ENGINE_FAILED; + + for (int i = 0; i < POWER_LARGEST; i++) + if (pthread_mutex_init(engine->items.lock + i, NULL)) + return ENGINE_FAILED; + + if (pthread_mutex_init(&engine->items.itemstats_lock, NULL)) + return ENGINE_FAILED; engine->cur_op_recs.keys = calloc(num_threads, sizeof(*engine->cur_op_recs.keys)); if (engine->cur_op_recs.keys == NULL) @@ -1306,6 +1340,7 @@ void items_destroy(struct default_engine *engine) { pthread_mutex_destroy(&engine->cur_op_recs.lock); pthread_cond_destroy(&engine->cur_op_recs.key_wait_cv); + for (int i = 0; i < POWER_LARGEST; i++) pthread_mutex_destroy(engine->items.lock + i); diff --git a/engines/default_engine/items.h b/engines/default_engine/items.h index 62aa1f9..6b02624 100644 --- a/engines/default_engine/items.h +++ b/engines/default_engine/items.h @@ -38,7 +38,8 @@ struct items { hash_item *tails[POWER_LARGEST]; itemstats_t itemstats[POWER_LARGEST]; unsigned int sizes[POWER_LARGEST]; - pthread_mutex_t lock; + pthread_mutex_t lock[POWER_LARGEST]; + pthread_mutex_t itemstats_lock; }; struct cur_key_rec { -- 1.7.1
