Currently cmap_bucket is protected by a counter. Cmap reader will be
blocked until the counter becomes even, writer increments it by 1 to be
odd, and after slot update, increments by 1 again to become even. If the
writer is pending or scheduled out during the writer course, the reader
will be blocked.

This patch introduces a bitmap as guard variable for (hash,node) pair.
Writer need set slot valid bit to 0 before each change. And after change,
writer sets valid bit to 1 and increments counter. Thus, reader can ensure
slot consistency by only scanning valid slot and then checking counter+bitmap
does not change while examining the bucket.

The only time a reader has to retry the read operation for single bucket
is when
1)a writer clears a bit in the valid bitmap between a reader's first and
second read of the counter+bitmap.
2)a writer increments the counter between a reader's first and second
read of counter+bitmap.
I.e. the read operation needs to be retried when some other thread has
made progress (with a write).
There is no spinning/waiting for other threads to complete. This makes
the design non-blocking (for readers).

And it has almost no additional overhead because counter and bitmap
share one 32 bits variable. No additional load/store for reader and
writer.

Reviewed-by: Ola Liljedahl <[email protected]>
Reviewed-by: Gavin Hu <[email protected]>
Signed-off-by: Yanqin Wei <[email protected]>
---
 lib/cmap.c | 157 ++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 101 insertions(+), 56 deletions(-)

diff --git a/lib/cmap.c b/lib/cmap.c
index e41c40794..a36efc68c 100644
--- a/lib/cmap.c
+++ b/lib/cmap.c
@@ -123,18 +123,27 @@ COVERAGE_DEFINE(cmap_shrink);
 /* Number of entries per bucket: 7 on 32-bit, 5 on 64-bit for 64B cacheline. */
 #define CMAP_K ((CACHE_LINE_SIZE - 4) / CMAP_ENTRY_SIZE)
 
+/* "counter_slotValidMap" in cmap_bucket includes counter and slot valid map.
+ * Reserve the "CMAP_K" least significant bits for slot valid bitmap and use
+ * remaining bits for bucket counter. Adding 1<<CMAP_K to counter_slotValidMap
+ * means incrementing counter by one. */
+#define CMAP_CNT_INC (1 << CMAP_K)
+#define CMAP_SLOT_MAP(CNT) (CNT & ((1 << CMAP_K) - 1))
+
 /* A cuckoo hash bucket.  Designed to be cache-aligned and exactly one cache
  * line long. */
 struct cmap_bucket {
     /* Padding to make cmap_bucket exactly one cache line long. */
     PADDED_MEMBERS(CACHE_LINE_SIZE,
-        /* Allows readers to track in-progress changes.  Initially zero, each
-         * writer increments this value just before and just after each change
-         * (see cmap_set_bucket()).  Thus, a reader can ensure that it gets a
-         * consistent snapshot by waiting for the counter to become even (see
-         * read_even_counter()), then checking that its value does not change
-         * while examining the bucket (see cmap_find()). */
-        atomic_uint32_t counter;
+        /* bucket counter + slot valid bitmap.
+         * The CMAP_K least significant bits is a bitmap, which indicates slot
+         * valid or not. The (32-CMAP_K) most significant bits is a counter.
+         * Each writer sets slot valid bit to 0 before each change. And after
+         * change, writer need set slot valid bit to 1 and increments counter.
+         * Thus, a reader can ensure that it gets a consistent snapshot by
+         * only scanning valid slot and then checking (counter+bitmap) does
+         * not change while examining the bucket.*/
+        atomic_uint32_t counter_slotValidMap;
 
         /* (hash, node) slots.  They are parallel arrays instead of an array of
          * structs to reduce the amount of space lost to padding.
@@ -275,53 +284,59 @@ cmap_is_empty(const struct cmap *cmap)
 }
 
 static inline uint32_t
-read_counter(const struct cmap_bucket *bucket_)
+read_slot_bitmap_protected(const struct cmap_bucket *bucket_)
 {
     struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_);
-    uint32_t counter;
+    uint32_t counter_slotValidMap;
 
-    atomic_read_explicit(&bucket->counter, &counter, memory_order_acquire);
+    atomic_read_explicit(&bucket->counter_slotValidMap, &counter_slotValidMap,
+                        memory_order_relaxed);
 
-    return counter;
+    return CMAP_SLOT_MAP(counter_slotValidMap);
 }
 
 static inline uint32_t
-read_even_counter(const struct cmap_bucket *bucket)
+read_counter_slot_bitmap(const struct cmap_bucket *bucket_)
 {
-    uint32_t counter;
+    struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_);
+    uint32_t counter_slotValidMap;
 
-    do {
-        counter = read_counter(bucket);
-    } while (OVS_UNLIKELY(counter & 1));
+    /*Both counter and slot valid map are changed each time slot is updated.
+     * So counter_slotValidMap can be directly used for tracking hash update */
+    atomic_read_explicit(&bucket->counter_slotValidMap, &counter_slotValidMap,
+                         memory_order_acquire);
 
-    return counter;
+    return counter_slotValidMap;
 }
 
 static inline bool
-counter_changed(const struct cmap_bucket *b_, uint32_t c)
+bucket_changed(const struct cmap_bucket *b_, uint32_t c)
 {
     struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_);
-    uint32_t counter;
+    uint32_t counter_slotValidMap;
 
     /* Need to make sure the counter read is not moved up, before the hash and
-     * cmap_node_next().  Using atomic_read_explicit with memory_order_acquire
-     * would allow prior reads to be moved after the barrier.
-     * atomic_thread_fence prevents all following memory accesses from moving
-     * prior to preceding loads. */
+     * cmap_node_next().
+     * atomic_thread_fence with acquire memory ordering prevents all following
+       memory accesses from moving prior to preceding loads. */
     atomic_thread_fence(memory_order_acquire);
-    atomic_read_relaxed(&b->counter, &counter);
+    atomic_read_relaxed(&b->counter_slotValidMap, &counter_slotValidMap);
 
-    return OVS_UNLIKELY(counter != c);
+    return OVS_UNLIKELY(counter_slotValidMap != c);
 }
 
 static inline const struct cmap_node *
-cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash)
+cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash,
+                    uint32_t slot_map)
 {
-    for (int i = 0; i < CMAP_K; i++) {
+    int i;
+
+    ULLONG_FOR_EACH_1 (i, slot_map) {
         if (bucket->hashes[i] == hash) {
             return cmap_node_next(&bucket->nodes[i]);
         }
     }
+
     return NULL;
 }
 
@@ -334,20 +349,23 @@ cmap_find__(const struct cmap_bucket *b1, const struct 
cmap_bucket *b2,
 
     do {
         do {
-            c1 = read_even_counter(b1);
-            node = cmap_find_in_bucket(b1, hash);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+            c1 = read_counter_slot_bitmap(b1);
+            node = cmap_find_in_bucket(b1, hash, CMAP_SLOT_MAP(c1));
+        } while (OVS_UNLIKELY(bucket_changed(b1, c1)));
+
         if (node) {
             break;
         }
+
         do {
-            c2 = read_even_counter(b2);
-            node = cmap_find_in_bucket(b2, hash);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+            c2 = read_counter_slot_bitmap(b2);
+            node = cmap_find_in_bucket(b2, hash, CMAP_SLOT_MAP(c2));
+        } while (OVS_UNLIKELY(bucket_changed(b2, c2)));
+
         if (node) {
             break;
         }
-    } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+    } while (OVS_UNLIKELY(bucket_changed(b1, c1)));
 
     return node;
 }
@@ -418,31 +436,37 @@ cmap_find_index(const struct cmap *cmap, uint32_t hash)
     const struct cmap_bucket *b1 = &impl->buckets[b_index1];
     const struct cmap_bucket *b2 = &impl->buckets[b_index2];
 
+    int i;
+
     do {
         do {
-            c1 = read_even_counter(b1);
-            for (int i = 0; i < CMAP_K; i++) {
+            c1 = read_counter_slot_bitmap(b1);
+
+            ULLONG_FOR_EACH_1 (i, CMAP_SLOT_MAP(c1)) {
                 if (b1->hashes[i] == hash) {
                     index = b_index1 * CMAP_K + i;
                  }
             }
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+        } while (OVS_UNLIKELY(bucket_changed(b1, c1)));
+
         if (index != UINT32_MAX) {
             break;
         }
+
         do {
-            c2 = read_even_counter(b2);
-            for (int i = 0; i < CMAP_K; i++) {
+            c2 = read_counter_slot_bitmap(b2);
+
+            ULLONG_FOR_EACH_1 (i, CMAP_SLOT_MAP(c2)) {
                 if (b2->hashes[i] == hash) {
                     index = b_index2 * CMAP_K + i;
                 }
             }
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+        } while (OVS_UNLIKELY(bucket_changed(b2, c2)));
 
         if (index != UINT32_MAX) {
             break;
         }
-    } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+    } while (OVS_UNLIKELY(bucket_changed(b1, c1)));
 
     return index;
 }
@@ -480,9 +504,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c1 = read_even_counter(b1);
-            node = cmap_find_in_bucket(b1, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+            c1 = read_counter_slot_bitmap(b1);
+            node = cmap_find_in_bucket(b1, hashes[i], CMAP_SLOT_MAP(c1));
+        } while (OVS_UNLIKELY(bucket_changed(b1, c1)));
 
         if (!node) {
             /* Not found (yet); Prefetch the 2nd bucket. */
@@ -503,9 +527,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c2 = read_even_counter(b2);
-            node = cmap_find_in_bucket(b2, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+            c2 = read_counter_slot_bitmap(b2);
+            node = cmap_find_in_bucket(b2, hashes[i], CMAP_SLOT_MAP(c2));
+        } while (OVS_UNLIKELY(bucket_changed(b2, c2)));
 
         if (!node) {
             /* Not found, but the node may have been moved from b2 to b1 right
@@ -516,7 +540,7 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
              * need to loop as long as it takes to get stable readings of
              * both buckets.  cmap_find__() does that, and now that we have
              * fetched both buckets we can just use it. */
-            if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) {
+            if (OVS_UNLIKELY(bucket_changed(b1s[i], c1s[i]))) {
                 node = cmap_find__(b1s[i], b2s[i], hashes[i]);
                 if (node) {
                     goto found;
@@ -537,8 +561,11 @@ static int
 cmap_find_slot_protected(struct cmap_bucket *b, uint32_t hash)
 {
     int i;
+    uint32_t slot_map;
 
-    for (i = 0; i < CMAP_K; i++) {
+    slot_map = read_slot_bitmap_protected(b);
+
+    ULLONG_FOR_EACH_1 (i, slot_map) {
         if (b->hashes[i] == hash && cmap_node_next_protected(&b->nodes[i])) {
             return i;
         }
@@ -551,8 +578,11 @@ cmap_find_bucket_protected(struct cmap_impl *impl, 
uint32_t hash, uint32_t h)
 {
     struct cmap_bucket *b = &impl->buckets[h & impl->mask];
     int i;
+    uint32_t slot_map;
 
-    for (i = 0; i < CMAP_K; i++) {
+    slot_map = read_slot_bitmap_protected(b);
+
+    ULLONG_FOR_EACH_1 (i, slot_map) {
         if (b->hashes[i] == hash) {
             return cmap_node_next_protected(&b->nodes[i]);
         }
@@ -595,15 +625,27 @@ static void
 cmap_set_bucket(struct cmap_bucket *b, int i,
                 struct cmap_node *node, uint32_t hash)
 {
-    uint32_t c;
+    uint32_t counter_slotValidMap;
+
+    atomic_read_explicit(&b->counter_slotValidMap, &counter_slotValidMap,
+                         memory_order_relaxed);
+    /*clear the bit in the valid bitmap before update slot*/
+    counter_slotValidMap &= ~(UINT32_C(1) << i);
+    atomic_store_explicit(&b->counter_slotValidMap, counter_slotValidMap,
+                         memory_order_relaxed);
 
-    atomic_read_explicit(&b->counter, &c, memory_order_acquire);
-    atomic_store_explicit(&b->counter, c + 1, memory_order_relaxed);
-    /*need to make sure setting hash is not moved up before counter update*/
+    /*need to make sure setting hash is not moved up before clearing
+     *slot_valid_map */
     atomic_thread_fence(memory_order_release);
     ovsrcu_set(&b->nodes[i].next, node); /* Also atomic. */
     b->hashes[i] = hash;
-    atomic_store_explicit(&b->counter, c + 2, memory_order_release);
+
+    /*set the bit in the valid bitmap after update slot*/
+    counter_slotValidMap |= (UINT32_C(1) << i);
+    /*increment the counter value to notify reader*/
+    counter_slotValidMap += CMAP_CNT_INC;
+    atomic_store_explicit(&b->counter_slotValidMap, counter_slotValidMap,
+                         memory_order_release);
 }
 
 /* Searches 'b' for a node with the given 'hash'.  If it finds one, adds
@@ -614,8 +656,11 @@ cmap_insert_dup(struct cmap_node *new_node, uint32_t hash,
                 struct cmap_bucket *b)
 {
     int i;
+    uint32_t slot_map;
 
-    for (i = 0; i < CMAP_K; i++) {
+    slot_map = read_slot_bitmap_protected(b);
+
+    ULLONG_FOR_EACH_1 (i, slot_map) {
         if (b->hashes[i] == hash) {
             struct cmap_node *node = cmap_node_next_protected(&b->nodes[i]);
 
-- 
2.17.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to