Signed-off-by: Bill Fischofer <[email protected]>

Improve buffer allocation by extending tag length to avoid ABA issues in
lockless allocation, and add support for lock-based allocation to permit
comparative performance testing. This resolves Bug 1051.

---

This patch is designed to resolve all open buffer allocation issues in two
ways. By switching from pointers to offsets, the tag bit length is greatly
extended to eliminate ABA conditions in lockless operation. As a result,
this also resolves Bug 1051.

Because in the presence of local buffer caching it is not clear whether
lockless or lock-based allocation will scale better, this patch adds
compile-time support for selecting which sychronization mechanism to use.
By default lockless allocation is used. To enable lock-based allocation
change the USE_BUFFER_POOL_LOCKS define to 1.

 .../linux-generic/include/odp_buffer_internal.h    |  78 ++++++--
 .../include/odp_buffer_pool_internal.h             | 209 +++++++++++++++++----
 platform/linux-generic/odp_buffer_pool.c           |  19 +-
 3 files changed, 251 insertions(+), 55 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index 39b0b05..b8faf63 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -29,25 +29,67 @@ extern "C" {
 #include <odp_byteorder.h>
 #include <odp_thread.h>
 
-
 #define ODP_BITSIZE(x) \
-       ((x) <=     2 ?  1 : \
-       ((x) <=     4 ?  2 : \
-       ((x) <=     8 ?  3 : \
-       ((x) <=    16 ?  4 : \
-       ((x) <=    32 ?  5 : \
-       ((x) <=    64 ?  6 : \
-       ((x) <=   128 ?  7 : \
-       ((x) <=   256 ?  8 : \
-       ((x) <=   512 ?  9 : \
-       ((x) <=  1024 ? 10 : \
-       ((x) <=  2048 ? 11 : \
-       ((x) <=  4096 ? 12 : \
-       ((x) <=  8196 ? 13 : \
-       ((x) <= 16384 ? 14 : \
-       ((x) <= 32768 ? 15 : \
-       ((x) <= 65536 ? 16 : \
-        (0/0)))))))))))))))))
+       ((x) <= (1ULL <<  1) ?  1 : \
+       ((x) <= (1ULL <<  2) ?  2 : \
+       ((x) <= (1ULL <<  3) ?  3 : \
+       ((x) <= (1ULL <<  4) ?  4 : \
+       ((x) <= (1ULL <<  5) ?  5 : \
+       ((x) <= (1ULL <<  6) ?  6 : \
+       ((x) <= (1ULL <<  7) ?  7 : \
+       ((x) <= (1ULL <<  8) ?  8 : \
+       ((x) <= (1ULL <<  9) ?  9 : \
+       ((x) <= (1ULL << 10) ? 10 : \
+       ((x) <= (1ULL << 11) ? 11 : \
+       ((x) <= (1ULL << 12) ? 12 : \
+       ((x) <= (1ULL << 13) ? 13 : \
+       ((x) <= (1ULL << 14) ? 14 : \
+       ((x) <= (1ULL << 15) ? 15 : \
+       ((x) <= (1ULL << 16) ? 16 : \
+       ((x) <= (1ULL << 17) ? 17 : \
+       ((x) <= (1ULL << 18) ? 18 : \
+       ((x) <= (1ULL << 19) ? 19 : \
+       ((x) <= (1ULL << 20) ? 20 : \
+       ((x) <= (1ULL << 21) ? 21 : \
+       ((x) <= (1ULL << 22) ? 22 : \
+       ((x) <= (1ULL << 23) ? 23 : \
+       ((x) <= (1ULL << 24) ? 24 : \
+       ((x) <= (1ULL << 25) ? 25 : \
+       ((x) <= (1ULL << 26) ? 26 : \
+       ((x) <= (1ULL << 27) ? 27 : \
+       ((x) <= (1ULL << 28) ? 28 : \
+       ((x) <= (1ULL << 29) ? 29 : \
+       ((x) <= (1ULL << 30) ? 30 : \
+       ((x) <= (1ULL << 31) ? 31 : \
+       ((x) <= (1ULL << 32) ? 32 : \
+       ((x) <= (1ULL << 33) ? 33 : \
+       ((x) <= (1ULL << 34) ? 34 : \
+       ((x) <= (1ULL << 35) ? 35 : \
+       ((x) <= (1ULL << 36) ? 36 : \
+       ((x) <= (1ULL << 37) ? 37 : \
+       ((x) <= (1ULL << 38) ? 38 : \
+       ((x) <= (1ULL << 39) ? 39 : \
+       ((x) <= (1ULL << 40) ? 40 : \
+       ((x) <= (1ULL << 41) ? 41 : \
+       ((x) <= (1ULL << 42) ? 42 : \
+       ((x) <= (1ULL << 43) ? 43 : \
+       ((x) <= (1ULL << 44) ? 44 : \
+       ((x) <= (1ULL << 45) ? 45 : \
+       ((x) <= (1ULL << 46) ? 46 : \
+       ((x) <= (1ULL << 47) ? 47 : \
+       ((x) <= (1ULL << 48) ? 48 : \
+       ((x) <= (1ULL << 49) ? 49 : \
+       ((x) <= (1ULL << 50) ? 50 : \
+       ((x) <= (1ULL << 51) ? 51 : \
+       ((x) <= (1ULL << 52) ? 52 : \
+       ((x) <= (1ULL << 53) ? 53 : \
+       ((x) <= (1ULL << 54) ? 54 : \
+       ((x) <= (1ULL << 55) ? 55 : \
+       ((x) <= (1ULL << 56) ? 56 : \
+       ((x) <= (1ULL << 57) ? 57 : \
+       ((x) <= (1ULL << 58) ? 58 : \
+       ((x) <= (1ULL << 59) ? 59 : \
+        (0/0))))))))))))))))))))))))))))))))))))))))))))))))))))))))))))
 
 _ODP_STATIC_ASSERT(ODP_CONFIG_PACKET_BUF_LEN_MIN >= 256,
                  "ODP Segment size must be a minimum of 256 bytes");
diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h 
b/platform/linux-generic/include/odp_buffer_pool_internal.h
index 2e48ac3..a5a1864 100644
--- a/platform/linux-generic/include/odp_buffer_pool_internal.h
+++ b/platform/linux-generic/include/odp_buffer_pool_internal.h
@@ -64,6 +64,8 @@ typedef struct local_cache_t {
 /* Extra error checks */
 /* #define POOL_ERROR_CHECK */
 
+/* Control use of locks vs. Lockless allocation */
+#define USE_BUFFER_POOL_LOCKS 0
 
 #ifdef POOL_USE_TICKETLOCK
 #include <odp_ticketlock.h>
@@ -80,7 +82,15 @@ typedef struct local_cache_t {
 struct pool_entry_s {
 #ifdef POOL_USE_TICKETLOCK
        odp_ticketlock_t        lock ODP_ALIGNED_CACHE;
+#if USE_BUFFER_POOL_LOCKS
+       odp_ticketlock_t        buf_lock;
+       odp_ticketlock_t        blk_lock;
+#endif
 #else
+#if USE_BUFFER_POOL_LOCKS
+       odp_spinlock_t          buf_lock;
+       odp_spinlock_t          blk_lock;
+#endif
        odp_spinlock_t          lock ODP_ALIGNED_CACHE;
 #endif
 
@@ -107,8 +117,13 @@ struct pool_entry_s {
        size_t                  pool_size;
        uint32_t                buf_align;
        uint32_t                buf_stride;
-       _odp_atomic_ptr_t       buf_freelist;
-       _odp_atomic_ptr_t       blk_freelist;
+#if USE_BUFFER_POOL_LOCKS
+       odp_buffer_hdr_t       *buf_freelist;
+       void                   *blk_freelist;
+#else
+       odp_atomic_u64_t        buf_freelist;
+       odp_atomic_u64_t        blk_freelist;
+#endif
        odp_atomic_u32_t        bufcount;
        odp_atomic_u32_t        blkcount;
        odp_atomic_u64_t        bufallocs;
@@ -142,57 +157,180 @@ extern void *pool_entry_ptr[];
 #define pool_is_secure(pool) 0
 #endif
 
-#define TAG_ALIGN ((size_t)16)
+#if USE_BUFFER_POOL_LOCKS
+
+static inline void *get_blk(struct pool_entry_s *pool)
+{
+       void *block;
+
+       POOL_LOCK(&pool->blk_lock);
+
+       block = pool->blk_freelist;
+       if (block != NULL)
+               pool->blk_freelist = ((odp_buf_blk_t *)block)->next;
+
+       POOL_UNLOCK(&pool->blk_lock);
+
+       if (odp_unlikely(block == NULL)) {
+               odp_atomic_inc_u64(&pool->blkempty);
+       } else {
+               odp_atomic_inc_u64(&pool->blkallocs);
+               odp_atomic_dec_u32(&pool->blkcount);
+       }
+
+       return block;
+}
+
+static inline void ret_blk(struct pool_entry_s *pool, void *block)
+{
+       POOL_LOCK(&pool->blk_lock);
+
+       ((odp_buf_blk_t *)block)->next = pool->blk_freelist;
+       pool->blk_freelist = block;
+
+       POOL_UNLOCK(&pool->blk_lock);
+
+       odp_atomic_inc_u32(&pool->blkcount);
+       odp_atomic_inc_u64(&pool->blkfrees);
+}
+
+static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
+{
+       odp_buffer_hdr_t *buf;
+
+       POOL_LOCK(&pool->buf_lock);
+
+       buf = pool->buf_freelist;
+       if (buf != NULL)
+               pool->buf_freelist = buf->next;
+
+       POOL_UNLOCK(&pool->buf_lock);
+
+       if (odp_unlikely(buf == NULL)) {
+               odp_atomic_inc_u64(&pool->bufempty);
+       } else {
+               uint64_t bufcount =
+                       odp_atomic_fetch_sub_u32(&pool->bufcount, 1) - 1;
+
+               /* Check for low watermark condition */
+               if (bufcount == pool->low_wm && !pool->low_wm_assert) {
+                       pool->low_wm_assert = 1;
+                       odp_atomic_inc_u64(&pool->low_wm_count);
+               }
+
+               odp_atomic_inc_u64(&pool->bufallocs);
+               /* Mark buffer allocated */
+               buf->allocator = odp_thread_id();
+       }
+
+       return buf;
+}
+
+static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t *buf)
+{
+       buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
+
+       if (!buf->flags.hdrdata && buf->type != ODP_BUFFER_TYPE_RAW) {
+               while (buf->segcount > 0) {
+                       if (buffer_is_secure(buf) || pool_is_secure(pool))
+                               memset(buf->addr[buf->segcount - 1],
+                                      0, buf->segsize);
+                       ret_blk(pool, buf->addr[--buf->segcount]);
+               }
+               buf->size = 0;
+       }
+
+       POOL_LOCK(&pool->buf_lock);
+
+       buf->next = pool->buf_freelist;
+       pool->buf_freelist = buf;
+
+       POOL_UNLOCK(&pool->buf_lock);
+
+       uint64_t bufcount = odp_atomic_fetch_add_u32(&pool->bufcount, 1) + 1;
+
+       /* Check if low watermark condition should be deasserted */
+       if (bufcount == pool->high_wm && pool->low_wm_assert) {
+               pool->low_wm_assert = 0;
+               odp_atomic_inc_u64(&pool->high_wm_count);
+       }
+
+       odp_atomic_inc_u64(&pool->buffrees);
+}
+
+#else
+
+#define OFFSET_BITS ODP_BITSIZE((uint64_t)ODP_BUFFER_MAX_BUFFERS * \
+                               (uint32_t)ODP_CONFIG_PACKET_BUF_LEN_MAX)
+
+#define MAX_OFFSET (1ULL << BUF_OFFSET_BITS)
+#define TAG_BITS (64 - OFFSET_BITS)
+#define TAG_ALIGN (1ULL << TAG_BITS)
+
+#define NULL_OFFSET (~0ULL)
+#define NULL_DETAGGED_OFFSET ((~0ULL) >> TAG_BITS)
 
 #define odp_cs(ptr, old, new) \
-       _odp_atomic_ptr_cmp_xchg_strong(&ptr, (void **)&old, (void *)new, \
-                                       _ODP_MEMMODEL_SC, \
-                                       _ODP_MEMMODEL_SC)
+       _odp_atomic_u64_cmp_xchg_strong_mm(&ptr, (uint64_t *)&old, \
+                                          (uint64_t)new, \
+                                          _ODP_MEMMODEL_SC, _ODP_MEMMODEL_SC)
+
+/* Helper functions for offset tagging to avoid ABA race conditions */
+#define odp_tag(offset) \
+       (((uint64_t)offset) & (TAG_ALIGN - 1))
 
-/* Helper functions for pointer tagging to avoid ABA race conditions */
-#define odp_tag(ptr) \
-       (((size_t)ptr) & (TAG_ALIGN - 1))
+#define off2ptr(offset, base) \
+       ((void *)((size_t)base + ((size_t)((uint64_t)offset >> TAG_BITS))))
 
-#define odp_detag(ptr) \
-       ((void *)(((size_t)ptr) & -TAG_ALIGN))
+#define odp_detag(offset, base) \
+       (((uint64_t)offset >> TAG_BITS) == NULL_DETAGGED_OFFSET ? NULL : \
+        off2ptr(offset, base))
 
-#define odp_retag(ptr, tag) \
-       ((void *)(((size_t)ptr) | odp_tag(tag)))
+#define ptr2off(ptr, base) \
+       ((uint64_t)(ptr == NULL ? (NULL_DETAGGED_OFFSET << TAG_BITS) : \
+                   (((uint64_t)((size_t)ptr - (size_t)base)) << TAG_BITS)))
+
+#define odp_retag(ptr, base, tag) (ptr2off(ptr, base) | odp_tag(tag))
 
 
 static inline void *get_blk(struct pool_entry_s *pool)
 {
-       void *oldhead, *myhead, *newhead;
+       uint64_t oldhead, newhead;
+       void *myhead;
 
-       oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
+       oldhead = odp_atomic_load_u64(&pool->blk_freelist);
 
        do {
-               size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
+               uint64_t tag = odp_tag(oldhead);
+               myhead = odp_detag(oldhead, pool->pool_base_addr);
                if (odp_unlikely(myhead == NULL))
                        break;
-               newhead = odp_retag(((odp_buf_blk_t *)myhead)->next, tag + 1);
+               newhead = odp_retag(((odp_buf_blk_t *)myhead)->next,
+                                   pool->pool_base_addr, tag + 1);
        } while (odp_cs(pool->blk_freelist, oldhead, newhead) == 0);
 
-       if (odp_unlikely(myhead == NULL))
+       if (odp_unlikely(myhead == NULL)) {
                odp_atomic_inc_u64(&pool->blkempty);
-       else
+       } else {
+               odp_atomic_inc_u64(&pool->blkallocs);
                odp_atomic_dec_u32(&pool->blkcount);
+       }
 
-       return (void *)myhead;
+       return myhead;
 }
 
 static inline void ret_blk(struct pool_entry_s *pool, void *block)
 {
-       void *oldhead, *myhead, *myblock;
+       uint64_t oldhead, myblock;
+       void *myhead;
 
-       oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
+       oldhead = odp_atomic_load_u64(&pool->blk_freelist);
 
        do {
                size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
+               myhead = odp_detag(oldhead, pool->pool_base_addr);
                ((odp_buf_blk_t *)block)->next = myhead;
-               myblock = odp_retag(block, tag + 1);
+               myblock = odp_retag(block, pool->pool_base_addr, tag + 1);
        } while (odp_cs(pool->blk_freelist, oldhead, myblock) == 0);
 
        odp_atomic_inc_u32(&pool->blkcount);
@@ -201,16 +339,18 @@ static inline void ret_blk(struct pool_entry_s *pool, 
void *block)
 
 static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
 {
-       odp_buffer_hdr_t *oldhead, *myhead, *newhead;
+       uint64_t oldhead, newhead;
+       odp_buffer_hdr_t *myhead;
 
-       oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
+       oldhead = odp_atomic_load_u64(&pool->buf_freelist);
 
        do {
                size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
+               myhead = odp_detag(oldhead, pool->pool_mdata_addr);
                if (odp_unlikely(myhead == NULL))
                        break;
-               newhead = odp_retag(myhead->next, tag + 1);
+               newhead = odp_retag(myhead->next, pool->pool_mdata_addr,
+                                   tag + 1);
        } while (odp_cs(pool->buf_freelist, oldhead, newhead) == 0);
 
        if (odp_unlikely(myhead == NULL)) {
@@ -235,7 +375,8 @@ static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s 
*pool)
 
 static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t *buf)
 {
-       odp_buffer_hdr_t *oldhead, *myhead, *mybuf;
+       uint64_t oldhead, mybuf;
+       odp_buffer_hdr_t *myhead;
 
        buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
 
@@ -249,13 +390,13 @@ static inline void ret_buf(struct pool_entry_s *pool, 
odp_buffer_hdr_t *buf)
                buf->size = 0;
        }
 
-       oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
+       oldhead = odp_atomic_load_u64(&pool->buf_freelist);
 
        do {
                size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
+               myhead = odp_detag(oldhead, pool->pool_mdata_addr);
                buf->next = myhead;
-               mybuf = odp_retag(buf, tag + 1);
+               mybuf = odp_retag(buf, pool->pool_mdata_addr, tag + 1);
        } while (odp_cs(pool->buf_freelist, oldhead, mybuf) == 0);
 
        uint64_t bufcount = odp_atomic_fetch_add_u32(&pool->bufcount, 1) + 1;
@@ -269,6 +410,8 @@ static inline void ret_buf(struct pool_entry_s *pool, 
odp_buffer_hdr_t *buf)
        odp_atomic_inc_u64(&pool->buffrees);
 }
 
+#endif
+
 static inline void *get_local_buf(local_cache_t *buf_cache,
                                  struct pool_entry_s *pool,
                                  size_t totsize)
diff --git a/platform/linux-generic/odp_buffer_pool.c 
b/platform/linux-generic/odp_buffer_pool.c
index eedb380..b905cdd 100644
--- a/platform/linux-generic/odp_buffer_pool.c
+++ b/platform/linux-generic/odp_buffer_pool.c
@@ -82,6 +82,10 @@ int odp_buffer_pool_init_global(void)
                /* init locks */
                pool_entry_t *pool = &pool_tbl->pool[i];
                POOL_LOCK_INIT(&pool->s.lock);
+#if USE_BUFFER_POOL_LOCKS
+               POOL_LOCK_INIT(&pool->s.buf_lock);
+               POOL_LOCK_INIT(&pool->s.blk_lock);
+#endif
                pool->s.pool_hdl = pool_index_to_handle(i);
                pool->s.pool_id = i;
                pool_entry_ptr[i] = pool;
@@ -91,6 +95,9 @@ int odp_buffer_pool_init_global(void)
        ODP_DBG("  pool_entry_s size     %zu\n", sizeof(struct pool_entry_s));
        ODP_DBG("  pool_entry_t size     %zu\n", sizeof(pool_entry_t));
        ODP_DBG("  odp_buffer_hdr_t size %zu\n", sizeof(odp_buffer_hdr_t));
+#if !USE_BUFFER_POOL_LOCKS
+       ODP_DBG("  offset_bits = %d, tag_bits = %d\n", OFFSET_BITS, TAG_BITS);
+#endif
        ODP_DBG("\n");
        return 0;
 }
@@ -286,10 +293,14 @@ odp_buffer_pool_t odp_buffer_pool_create(const char *name,
                pool->s.pool_mdata_addr = mdata_base_addr;
 
                pool->s.buf_stride = buf_stride;
-               _odp_atomic_ptr_store(&pool->s.buf_freelist, NULL,
-                                     _ODP_MEMMODEL_RLX);
-               _odp_atomic_ptr_store(&pool->s.blk_freelist, NULL,
-                                     _ODP_MEMMODEL_RLX);
+
+#if USE_BUFFER_POOL_LOCKS
+               pool->s.buf_freelist = NULL;
+               pool->s.blk_freelist = NULL;
+#else
+               odp_atomic_store_u64(&pool->s.buf_freelist, NULL_OFFSET);
+               odp_atomic_store_u64(&pool->s.blk_freelist, NULL_OFFSET);
+#endif
 
                /* Initialization will increment these to their target vals */
                odp_atomic_store_u32(&pool->s.bufcount, 0);
-- 
2.1.0


_______________________________________________
lng-odp mailing list
[email protected]
http://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to