Currently cmap_bucket is protected by a counter. Cmap reader will be blocked until the counter becomes even, writer increments it by 1 to be odd, and after slot update, increments by 1 again to become even. If the writer is pending or scheduled out during the writer course, the reader will be blocked.
This patch introduces a bitmap as guard variable for (hash,node) pair. Writer need set slot valid bit to 0 before each change. And after change, writer sets valid bit to 1 and increments counter. Thus, reader can ensure slot consistency by only scanning valid slot and then checking counter+bitmap does not change while examining the bucket. The only time a reader has to retry the read operation for single bucket is when 1)a writer clears a bit in the valid bitmap between a reader's first and second read of the counter+bitmap. 2)a writer increments the counter between a reader's first and second read of counter+bitmap. I.e. the read operation needs to be retried when some other thread has made progress (with a write). There is no spinning/waiting for other threads to complete. This makes the design non-blocking (for readers). And it has almost no additional overhead because counter and bitmap share one 32 bits variable. No additional load/store for reader and writer. Reviewed-by: Ola Liljedahl <[email protected]> Reviewed-by: Gavin Hu <[email protected]> Signed-off-by: Yanqin Wei <[email protected]> --- lib/cmap.c | 157 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 56 deletions(-) diff --git a/lib/cmap.c b/lib/cmap.c index e41c40794..a36efc68c 100644 --- a/lib/cmap.c +++ b/lib/cmap.c @@ -123,18 +123,27 @@ COVERAGE_DEFINE(cmap_shrink); /* Number of entries per bucket: 7 on 32-bit, 5 on 64-bit for 64B cacheline. */ #define CMAP_K ((CACHE_LINE_SIZE - 4) / CMAP_ENTRY_SIZE) +/* "counter_slotValidMap" in cmap_bucket includes counter and slot valid map. + * Reserve the "CMAP_K" least significant bits for slot valid bitmap and use + * remaining bits for bucket counter. Adding 1<<CMAP_K to counter_slotValidMap + * means incrementing counter by one. */ +#define CMAP_CNT_INC (1 << CMAP_K) +#define CMAP_SLOT_MAP(CNT) (CNT & ((1 << CMAP_K) - 1)) + /* A cuckoo hash bucket. Designed to be cache-aligned and exactly one cache * line long. */ struct cmap_bucket { /* Padding to make cmap_bucket exactly one cache line long. */ PADDED_MEMBERS(CACHE_LINE_SIZE, - /* Allows readers to track in-progress changes. Initially zero, each - * writer increments this value just before and just after each change - * (see cmap_set_bucket()). Thus, a reader can ensure that it gets a - * consistent snapshot by waiting for the counter to become even (see - * read_even_counter()), then checking that its value does not change - * while examining the bucket (see cmap_find()). */ - atomic_uint32_t counter; + /* bucket counter + slot valid bitmap. + * The CMAP_K least significant bits is a bitmap, which indicates slot + * valid or not. The (32-CMAP_K) most significant bits is a counter. + * Each writer sets slot valid bit to 0 before each change. And after + * change, writer need set slot valid bit to 1 and increments counter. + * Thus, a reader can ensure that it gets a consistent snapshot by + * only scanning valid slot and then checking (counter+bitmap) does + * not change while examining the bucket.*/ + atomic_uint32_t counter_slotValidMap; /* (hash, node) slots. They are parallel arrays instead of an array of * structs to reduce the amount of space lost to padding. @@ -275,53 +284,59 @@ cmap_is_empty(const struct cmap *cmap) } static inline uint32_t -read_counter(const struct cmap_bucket *bucket_) +read_slot_bitmap_protected(const struct cmap_bucket *bucket_) { struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_); - uint32_t counter; + uint32_t counter_slotValidMap; - atomic_read_explicit(&bucket->counter, &counter, memory_order_acquire); + atomic_read_explicit(&bucket->counter_slotValidMap, &counter_slotValidMap, + memory_order_relaxed); - return counter; + return CMAP_SLOT_MAP(counter_slotValidMap); } static inline uint32_t -read_even_counter(const struct cmap_bucket *bucket) +read_counter_slot_bitmap(const struct cmap_bucket *bucket_) { - uint32_t counter; + struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_); + uint32_t counter_slotValidMap; - do { - counter = read_counter(bucket); - } while (OVS_UNLIKELY(counter & 1)); + /*Both counter and slot valid map are changed each time slot is updated. + * So counter_slotValidMap can be directly used for tracking hash update */ + atomic_read_explicit(&bucket->counter_slotValidMap, &counter_slotValidMap, + memory_order_acquire); - return counter; + return counter_slotValidMap; } static inline bool -counter_changed(const struct cmap_bucket *b_, uint32_t c) +bucket_changed(const struct cmap_bucket *b_, uint32_t c) { struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_); - uint32_t counter; + uint32_t counter_slotValidMap; /* Need to make sure the counter read is not moved up, before the hash and - * cmap_node_next(). Using atomic_read_explicit with memory_order_acquire - * would allow prior reads to be moved after the barrier. - * atomic_thread_fence prevents all following memory accesses from moving - * prior to preceding loads. */ + * cmap_node_next(). + * atomic_thread_fence with acquire memory ordering prevents all following + memory accesses from moving prior to preceding loads. */ atomic_thread_fence(memory_order_acquire); - atomic_read_relaxed(&b->counter, &counter); + atomic_read_relaxed(&b->counter_slotValidMap, &counter_slotValidMap); - return OVS_UNLIKELY(counter != c); + return OVS_UNLIKELY(counter_slotValidMap != c); } static inline const struct cmap_node * -cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash) +cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash, + uint32_t slot_map) { - for (int i = 0; i < CMAP_K; i++) { + int i; + + ULLONG_FOR_EACH_1 (i, slot_map) { if (bucket->hashes[i] == hash) { return cmap_node_next(&bucket->nodes[i]); } } + return NULL; } @@ -334,20 +349,23 @@ cmap_find__(const struct cmap_bucket *b1, const struct cmap_bucket *b2, do { do { - c1 = read_even_counter(b1); - node = cmap_find_in_bucket(b1, hash); - } while (OVS_UNLIKELY(counter_changed(b1, c1))); + c1 = read_counter_slot_bitmap(b1); + node = cmap_find_in_bucket(b1, hash, CMAP_SLOT_MAP(c1)); + } while (OVS_UNLIKELY(bucket_changed(b1, c1))); + if (node) { break; } + do { - c2 = read_even_counter(b2); - node = cmap_find_in_bucket(b2, hash); - } while (OVS_UNLIKELY(counter_changed(b2, c2))); + c2 = read_counter_slot_bitmap(b2); + node = cmap_find_in_bucket(b2, hash, CMAP_SLOT_MAP(c2)); + } while (OVS_UNLIKELY(bucket_changed(b2, c2))); + if (node) { break; } - } while (OVS_UNLIKELY(counter_changed(b1, c1))); + } while (OVS_UNLIKELY(bucket_changed(b1, c1))); return node; } @@ -418,31 +436,37 @@ cmap_find_index(const struct cmap *cmap, uint32_t hash) const struct cmap_bucket *b1 = &impl->buckets[b_index1]; const struct cmap_bucket *b2 = &impl->buckets[b_index2]; + int i; + do { do { - c1 = read_even_counter(b1); - for (int i = 0; i < CMAP_K; i++) { + c1 = read_counter_slot_bitmap(b1); + + ULLONG_FOR_EACH_1 (i, CMAP_SLOT_MAP(c1)) { if (b1->hashes[i] == hash) { index = b_index1 * CMAP_K + i; } } - } while (OVS_UNLIKELY(counter_changed(b1, c1))); + } while (OVS_UNLIKELY(bucket_changed(b1, c1))); + if (index != UINT32_MAX) { break; } + do { - c2 = read_even_counter(b2); - for (int i = 0; i < CMAP_K; i++) { + c2 = read_counter_slot_bitmap(b2); + + ULLONG_FOR_EACH_1 (i, CMAP_SLOT_MAP(c2)) { if (b2->hashes[i] == hash) { index = b_index2 * CMAP_K + i; } } - } while (OVS_UNLIKELY(counter_changed(b2, c2))); + } while (OVS_UNLIKELY(bucket_changed(b2, c2))); if (index != UINT32_MAX) { break; } - } while (OVS_UNLIKELY(counter_changed(b1, c1))); + } while (OVS_UNLIKELY(bucket_changed(b1, c1))); return index; } @@ -480,9 +504,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map, const struct cmap_node *node; do { - c1 = read_even_counter(b1); - node = cmap_find_in_bucket(b1, hashes[i]); - } while (OVS_UNLIKELY(counter_changed(b1, c1))); + c1 = read_counter_slot_bitmap(b1); + node = cmap_find_in_bucket(b1, hashes[i], CMAP_SLOT_MAP(c1)); + } while (OVS_UNLIKELY(bucket_changed(b1, c1))); if (!node) { /* Not found (yet); Prefetch the 2nd bucket. */ @@ -503,9 +527,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map, const struct cmap_node *node; do { - c2 = read_even_counter(b2); - node = cmap_find_in_bucket(b2, hashes[i]); - } while (OVS_UNLIKELY(counter_changed(b2, c2))); + c2 = read_counter_slot_bitmap(b2); + node = cmap_find_in_bucket(b2, hashes[i], CMAP_SLOT_MAP(c2)); + } while (OVS_UNLIKELY(bucket_changed(b2, c2))); if (!node) { /* Not found, but the node may have been moved from b2 to b1 right @@ -516,7 +540,7 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map, * need to loop as long as it takes to get stable readings of * both buckets. cmap_find__() does that, and now that we have * fetched both buckets we can just use it. */ - if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) { + if (OVS_UNLIKELY(bucket_changed(b1s[i], c1s[i]))) { node = cmap_find__(b1s[i], b2s[i], hashes[i]); if (node) { goto found; @@ -537,8 +561,11 @@ static int cmap_find_slot_protected(struct cmap_bucket *b, uint32_t hash) { int i; + uint32_t slot_map; - for (i = 0; i < CMAP_K; i++) { + slot_map = read_slot_bitmap_protected(b); + + ULLONG_FOR_EACH_1 (i, slot_map) { if (b->hashes[i] == hash && cmap_node_next_protected(&b->nodes[i])) { return i; } @@ -551,8 +578,11 @@ cmap_find_bucket_protected(struct cmap_impl *impl, uint32_t hash, uint32_t h) { struct cmap_bucket *b = &impl->buckets[h & impl->mask]; int i; + uint32_t slot_map; - for (i = 0; i < CMAP_K; i++) { + slot_map = read_slot_bitmap_protected(b); + + ULLONG_FOR_EACH_1 (i, slot_map) { if (b->hashes[i] == hash) { return cmap_node_next_protected(&b->nodes[i]); } @@ -595,15 +625,27 @@ static void cmap_set_bucket(struct cmap_bucket *b, int i, struct cmap_node *node, uint32_t hash) { - uint32_t c; + uint32_t counter_slotValidMap; + + atomic_read_explicit(&b->counter_slotValidMap, &counter_slotValidMap, + memory_order_relaxed); + /*clear the bit in the valid bitmap before update slot*/ + counter_slotValidMap &= ~(UINT32_C(1) << i); + atomic_store_explicit(&b->counter_slotValidMap, counter_slotValidMap, + memory_order_relaxed); - atomic_read_explicit(&b->counter, &c, memory_order_acquire); - atomic_store_explicit(&b->counter, c + 1, memory_order_relaxed); - /*need to make sure setting hash is not moved up before counter update*/ + /*need to make sure setting hash is not moved up before clearing + *slot_valid_map */ atomic_thread_fence(memory_order_release); ovsrcu_set(&b->nodes[i].next, node); /* Also atomic. */ b->hashes[i] = hash; - atomic_store_explicit(&b->counter, c + 2, memory_order_release); + + /*set the bit in the valid bitmap after update slot*/ + counter_slotValidMap |= (UINT32_C(1) << i); + /*increment the counter value to notify reader*/ + counter_slotValidMap += CMAP_CNT_INC; + atomic_store_explicit(&b->counter_slotValidMap, counter_slotValidMap, + memory_order_release); } /* Searches 'b' for a node with the given 'hash'. If it finds one, adds @@ -614,8 +656,11 @@ cmap_insert_dup(struct cmap_node *new_node, uint32_t hash, struct cmap_bucket *b) { int i; + uint32_t slot_map; - for (i = 0; i < CMAP_K; i++) { + slot_map = read_slot_bitmap_protected(b); + + ULLONG_FOR_EACH_1 (i, slot_map) { if (b->hashes[i] == hash) { struct cmap_node *node = cmap_node_next_protected(&b->nodes[i]); -- 2.17.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
