Paolo Bonzini writes: > On 25/10/2016 17:49, Pranith Kumar wrote: >> But we are taking the seqlock of only the head bucket, while the >> readers are reading hashes/pointers of the chained buckets. > > No, we aren't. See qht_lookup__slowpath.
I don't see it. The reader is taking the head bucket look in qht_lookup__slowpath() and then iterating over the chained buckets in qht_do_lookup(). The writer is doing the same. It is taking the head bucket lock in qht_insert__locked(). I've written a patch (see below) to take the per-bucket sequence locks. > > This patch: > > throughput base patch %change > update > 0 8.07 13.33 +65% > 10 7.10 8.90 +25% > 20 6.34 7.02 +10% > 30 5.48 6.11 +9.6% > 40 4.90 5.46 +11.42% > > > Just doubling the cachesize: > > throughput base patch %change > update > 0 8.07 4.47 -45% ?!? > 10 7.10 9.82 +38% > 20 6.34 8.13 +28% > 30 5.48 7.13 +30% > 40 5.90 6.45 +30% > > It seems to me that your machine has 128-byte cachelines. > Nope. It is just the regular 64 byte cache line. $ getconf LEVEL1_DCACHE_LINESIZE 64 (The machine model is Xeon CPU E5-2620). Take the per-bucket sequence locks instead of the head bucket lock. Signed-off-by: Pranith Kumar <bobby.pr...@gmail.com> --- util/qht.c | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/util/qht.c b/util/qht.c index 4d82609..cfce5fc 100644 --- a/util/qht.c +++ b/util/qht.c @@ -374,19 +374,19 @@ static void qht_bucket_reset__locked(struct qht_bucket *head) struct qht_bucket *b = head; int i; - seqlock_write_begin(&head->sequence); do { + seqlock_write_begin(&b->sequence); for (i = 0; i < QHT_BUCKET_ENTRIES; i++) { if (b->pointers[i] == NULL) { - goto done; + seqlock_write_end(&b->sequence); + return; } atomic_set(&b->hashes[i], 0); atomic_set(&b->pointers[i], NULL); } + seqlock_write_end(&b->sequence); b = b->next; } while (b); - done: - seqlock_write_end(&head->sequence); } /* call with all bucket locks held */ @@ -446,6 +446,8 @@ void *qht_do_lookup(struct qht_bucket *head, qht_lookup_func_t func, int i; do { + void *q = NULL; + unsigned int version = seqlock_read_begin(&b->sequence); for (i = 0; i < QHT_BUCKET_ENTRIES; i++) { if (atomic_read(&b->hashes[i]) == hash) { /* The pointer is dereferenced before seqlock_read_retry, @@ -455,11 +457,16 @@ void *qht_do_lookup(struct qht_bucket *head, qht_lookup_func_t func, void *p = atomic_rcu_read(&b->pointers[i]); if (likely(p) && likely(func(p, userp))) { - return p; + q = p; + break; } } } - b = atomic_rcu_read(&b->next); + if (!q) { + b = atomic_rcu_read(&b->next); + } else if (!seqlock_read_retry(&b->sequence, version)) { + return q; + } } while (b); return NULL; @@ -469,14 +476,7 @@ static __attribute__((noinline)) void *qht_lookup__slowpath(struct qht_bucket *b, qht_lookup_func_t func, const void *userp, uint32_t hash) { - unsigned int version; - void *ret; - - do { - version = seqlock_read_begin(&b->sequence); - ret = qht_do_lookup(b, func, userp, hash); - } while (seqlock_read_retry(&b->sequence, version)); - return ret; + return qht_do_lookup(b, func, userp, hash); } void *qht_lookup(struct qht *ht, qht_lookup_func_t func, const void *userp, @@ -537,14 +537,14 @@ static bool qht_insert__locked(struct qht *ht, struct qht_map *map, found: /* found an empty key: acquire the seqlock and write */ - seqlock_write_begin(&head->sequence); + seqlock_write_begin(&b->sequence); if (new) { atomic_rcu_set(&prev->next, b); } /* smp_wmb() implicit in seqlock_write_begin. */ atomic_set(&b->hashes[i], hash); atomic_set(&b->pointers[i], p); - seqlock_write_end(&head->sequence); + seqlock_write_end(&b->sequence); return true; } @@ -665,9 +665,9 @@ bool qht_remove__locked(struct qht_map *map, struct qht_bucket *head, } if (q == p) { qht_debug_assert(b->hashes[i] == hash); - seqlock_write_begin(&head->sequence); + seqlock_write_begin(&b->sequence); qht_bucket_remove_entry(b, i); - seqlock_write_end(&head->sequence); + seqlock_write_end(&b->sequence); return true; } } -- 2.10.1