From: Geoffrey Blake <[email protected]>

Implementation of the tagged pointer used in the lockless buffer allocation to
use a 64-bit tag. Defaults to a locked implementation of CAS if 16B CAS is
unavailable. Should avoid any future ABA issues that smaller tags could
theoretically suffer.

Signed-off-by: Geoffrey Blake <[email protected]>
---
(This document/code contribution attached is provided under the terms of 
agreement LES-LTM-21309)
 configure.ac                                       |  13 +++
 .../linux-generic/include/odp_atomic_internal.h    | 111 ++++++++++++++++++---
 .../include/odp_buffer_pool_internal.h             |  71 ++++++-------
 platform/linux-generic/odp_buffer_pool.c           |   6 +-
 4 files changed, 143 insertions(+), 58 deletions(-)

diff --git a/configure.ac b/configure.ac
index f0ce7e0..64e1f33 100644
--- a/configure.ac
+++ b/configure.ac
@@ -182,6 +182,19 @@ ODP_CFLAGS="$ODP_CFLAGS -Wcast-align -Wnested-externs 
-Wcast-qual -Wformat-nonli
 ODP_CFLAGS="$ODP_CFLAGS -Wformat-security -Wundef -Wwrite-strings"
 
 ##########################################################################
+# Check if compiler supports cmpxchng16
+##########################################################################
+my_save_cflags="$CFLAGS"
+CFLAGS=-mcx16
+AC_MSG_CHECKING([whether CC supports -mcx16])
+AC_COMPILE_IFELSE([AC_LANG_PROGRAM([])],
+       [AC_MSG_RESULT([yes])]
+       [ODP_CFLAGS="$ODP_CFLAGS -mcx16"],
+       [AC_MSG_RESULT([no])]
+)
+CFLAGS="$my_save_cflags"
+
+##########################################################################
 # Default include setup
 ##########################################################################
 AM_CFLAGS="$AM_CFLAGS $ODP_CFLAGS"
diff --git a/platform/linux-generic/include/odp_atomic_internal.h 
b/platform/linux-generic/include/odp_atomic_internal.h
index a02ecc5..c964bf8 100644
--- a/platform/linux-generic/include/odp_atomic_internal.h
+++ b/platform/linux-generic/include/odp_atomic_internal.h
@@ -33,10 +33,26 @@ extern "C" {
 /**
  * Pointer atomic type
  */
+#if defined __SIZEOF_INT128 && defined __GCC_HAVE_SYNC_COMPARE_AND_SWAP_16
+#define ODP_HAVE_CMPX16
+typedef union {
+       __uint128_t v;
+       struct {
+               uint64_t tag;
+               void *ptr;
+       } tptr;
+} _odp_atomic_ptr_t
+ODP_ALIGNED(sizeof(__int128)); /* Enforce alignement! */
+#else
 typedef struct {
-       void *v; /**< Actual storage for the atomic variable */
+       char lock;
+       struct {
+               uint64_t tag;
+               void *ptr;
+       } tptr;
 } _odp_atomic_ptr_t
-ODP_ALIGNED(sizeof(void *)); /* Enforce alignement! */
+ODP_ALIGNED(16);
+#endif
 
 /**
  * Atomic flag (boolean) type
@@ -452,6 +468,35 @@ static inline void _odp_atomic_u64_sub_mm(odp_atomic_u64_t 
*atom,
  * _odp_atomic_ptr_xchg - return old value
  *****************************************************************************/
 
+/* Check if the compiler support lock-less atomic operations on 64-bit types */
+#if !defined ODP_HAVE_CMPX16
+/**
+ * @internal
+ * Helper macro for lock-based atomic operations on 128-bit tagged pointers
+ * @param[in,out] atom Pointer to the atomic tagged pointer variable
+ * @param expr Expression used update the variable.
+ * @param mm Memory order to use.
+ * @return The old value of the variable.
+ */
+#define ATOMIC_TPTR_OP_MM(atom, expr, mm) \
+({ \
+        _odp_atomic_ptr_t old_val; \
+        /* Loop while lock is already taken, stop when lock becomes clear */ \
+        while (__atomic_test_and_set(&(atom)->lock, \
+               (mm) == _ODP_MEMMODEL_SC ? \
+               __ATOMIC_SEQ_CST : __ATOMIC_ACQUIRE)) \
+               (void)0; \
+        old_val.tptr = (atom->tptr); \
+        (expr); /* Perform whatever update is desired */ \
+        __atomic_clear(&(atom)->lock, \
+                (mm) == _ODP_MEMMODEL_SC ? \
+                __ATOMIC_SEQ_CST : __ATOMIC_RELEASE); \
+        __atomic_clear(&(old_val).lock, \
+                (mm) == _ODP_MEMMODEL_SC ? \
+                __ATOMIC_SEQ_CST : __ATOMIC_RELEASE); \
+        old_val; /* Return old value */ \
+})
+#endif
 /**
  * Initialization of pointer atomic variable
  *
@@ -460,7 +505,11 @@ static inline void _odp_atomic_u64_sub_mm(odp_atomic_u64_t 
*atom,
  */
 static inline void _odp_atomic_ptr_init(_odp_atomic_ptr_t *atom, void *val)
 {
-       __atomic_store_n(&atom->v, val, __ATOMIC_RELAXED);
+       __atomic_store_n(&atom->tptr.ptr, val, __ATOMIC_RELAXED);
+       __atomic_store_n(&atom->tptr.tag, 0, __ATOMIC_RELAXED);
+#ifndef ODP_HAVE_CMPX16
+       __atomic_clear(&atom->lock, __ATOMIC_RELAXED);
+#endif
 }
 
 /**
@@ -471,10 +520,16 @@ static inline void _odp_atomic_ptr_init(_odp_atomic_ptr_t 
*atom, void *val)
  *
  * @return Value of the variable
  */
-static inline void *_odp_atomic_ptr_load(const _odp_atomic_ptr_t *atom,
-               _odp_memmodel_t mmodel)
+static inline _odp_atomic_ptr_t _odp_atomic_ptr_load(_odp_atomic_ptr_t *atom,
+                                                    _odp_memmodel_t mmodel)
 {
-       return __atomic_load_n(&atom->v, mmodel);
+#ifdef ODP_HAVE_CMPX16
+       _odp_atomic_ptr_t ret;
+       ret.v = __atomic_load_n(&atom->v, mmodel);
+       return ret;
+#else
+       return ATOMIC_TPTR_OP_MM(atom, (void)0, mmodel);
+#endif
 }
 
 /**
@@ -485,10 +540,14 @@ static inline void *_odp_atomic_ptr_load(const 
_odp_atomic_ptr_t *atom,
  * @param mmodel Memory order associated with the store operation
  */
 static inline void _odp_atomic_ptr_store(_odp_atomic_ptr_t *atom,
-               void *val,
+               _odp_atomic_ptr_t *val,
                _odp_memmodel_t mmodel)
 {
-       __atomic_store_n(&atom->v, val, mmodel);
+#ifdef ODP_HAVE_CMPX16
+       __atomic_store_n(&atom->v, val->v, mmodel);
+#else
+       ATOMIC_TPTR_OP_MM(atom, atom->tptr = val->tptr, mmodel);
+#endif
 }
 
 /**
@@ -500,11 +559,15 @@ static inline void 
_odp_atomic_ptr_store(_odp_atomic_ptr_t *atom,
  *
  * @return Old value of variable
  */
-static inline void *_odp_atomic_ptr_xchg(_odp_atomic_ptr_t *atom,
-               void *val,
+static inline _odp_atomic_ptr_t _odp_atomic_ptr_xchg(_odp_atomic_ptr_t *atom,
+               _odp_atomic_ptr_t *val,
                _odp_memmodel_t mmodel)
 {
+#ifdef ODP_HAVE_CMPX16
        return __atomic_exchange_n(&atom->v, val, mmodel);
+#else
+       return ATOMIC_TPTR_OP_MM(atom, atom->tptr = val->tptr, mmodel);
+#endif
 }
 
 /**
@@ -524,13 +587,35 @@ static inline void 
*_odp_atomic_ptr_xchg(_odp_atomic_ptr_t *atom,
  */
 static inline int _odp_atomic_ptr_cmp_xchg_strong(
                _odp_atomic_ptr_t *atom,
-               void **exp,
-               void *val,
+               _odp_atomic_ptr_t *exp,
+               _odp_atomic_ptr_t *val,
                _odp_memmodel_t success,
                _odp_memmodel_t failure)
 {
-       return __atomic_compare_exchange_n(&atom->v, exp, val,
+#ifdef ODP_HAVE_CMPX16
+       return = __atomic_compare_exchange_n(&atom->v, &exp->v, val->v,
                        false/*strong*/, success, failure);
+#else
+       /* Possibly we are a bit pessimistic with the memory models */
+       odp_bool_t ret_succ;
+       /* Loop while lock is already taken, stop when lock becomes clear */
+       while (__atomic_test_and_set(&(atom)->lock,
+               (success) == _ODP_MEMMODEL_SC ?
+               __ATOMIC_SEQ_CST : __ATOMIC_ACQUIRE))
+               (void)0;
+       if (atom->tptr.tag == exp->tptr.tag &&
+           atom->tptr.ptr == exp->tptr.ptr) {
+               atom->tptr = val->tptr;
+               ret_succ = 1;
+       } else {
+               exp->tptr = atom->tptr;
+               ret_succ = 0;
+       }
+       __atomic_clear(&(atom)->lock,
+                      (ret_succ ? success : failure) == _ODP_MEMMODEL_SC ?
+                      __ATOMIC_SEQ_CST : __ATOMIC_RELEASE);
+       return ret_succ;
+#endif
 }
 
 /*****************************************************************************
diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h 
b/platform/linux-generic/include/odp_buffer_pool_internal.h
index 2e48ac3..abb45c8 100644
--- a/platform/linux-generic/include/odp_buffer_pool_internal.h
+++ b/platform/linux-generic/include/odp_buffer_pool_internal.h
@@ -142,57 +142,44 @@ extern void *pool_entry_ptr[];
 #define pool_is_secure(pool) 0
 #endif
 
-#define TAG_ALIGN ((size_t)16)
-
 #define odp_cs(ptr, old, new) \
-       _odp_atomic_ptr_cmp_xchg_strong(&ptr, (void **)&old, (void *)new, \
+       _odp_atomic_ptr_cmp_xchg_strong(&ptr, &old, &new, \
                                        _ODP_MEMMODEL_SC, \
                                        _ODP_MEMMODEL_SC)
 
-/* Helper functions for pointer tagging to avoid ABA race conditions */
-#define odp_tag(ptr) \
-       (((size_t)ptr) & (TAG_ALIGN - 1))
-
-#define odp_detag(ptr) \
-       ((void *)(((size_t)ptr) & -TAG_ALIGN))
-
-#define odp_retag(ptr, tag) \
-       ((void *)(((size_t)ptr) | odp_tag(tag)))
-
-
 static inline void *get_blk(struct pool_entry_s *pool)
 {
-       void *oldhead, *myhead, *newhead;
+       _odp_atomic_ptr_t oldhead, newhead;
 
        oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
 
        do {
-               size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
-               if (odp_unlikely(myhead == NULL))
+               uint64_t tag = oldhead.tptr.tag;
+               if (odp_unlikely(oldhead.tptr.ptr == NULL))
                        break;
-               newhead = odp_retag(((odp_buf_blk_t *)myhead)->next, tag + 1);
+               newhead.tptr.ptr = ((odp_buf_blk_t *)oldhead.tptr.ptr)->next;
+               newhead.tptr.tag = tag + 1;
        } while (odp_cs(pool->blk_freelist, oldhead, newhead) == 0);
 
-       if (odp_unlikely(myhead == NULL))
+       if (odp_unlikely(oldhead.tptr.ptr == NULL))
                odp_atomic_inc_u64(&pool->blkempty);
        else
                odp_atomic_dec_u32(&pool->blkcount);
 
-       return (void *)myhead;
+       return (void *)oldhead.tptr.ptr;
 }
 
 static inline void ret_blk(struct pool_entry_s *pool, void *block)
 {
-       void *oldhead, *myhead, *myblock;
+       _odp_atomic_ptr_t oldhead, myblock;
 
        oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
 
        do {
-               size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
-               ((odp_buf_blk_t *)block)->next = myhead;
-               myblock = odp_retag(block, tag + 1);
+               uint64_t tag = oldhead.tptr.tag;
+               ((odp_buf_blk_t *)block)->next = oldhead.tptr.ptr;
+               myblock.tptr.ptr = block;
+               myblock.tptr.tag = tag + 1;
        } while (odp_cs(pool->blk_freelist, oldhead, myblock) == 0);
 
        odp_atomic_inc_u32(&pool->blkcount);
@@ -201,23 +188,25 @@ static inline void ret_blk(struct pool_entry_s *pool, 
void *block)
 
 static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
 {
-       odp_buffer_hdr_t *oldhead, *myhead, *newhead;
+       _odp_atomic_ptr_t oldhead, newhead;
+       odp_buffer_hdr_t *mybuf = NULL;
 
        oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
 
        do {
-               size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
-               if (odp_unlikely(myhead == NULL))
+               uint64_t tag = oldhead.tptr.tag;
+               if (odp_unlikely(oldhead.tptr.ptr == NULL))
                        break;
-               newhead = odp_retag(myhead->next, tag + 1);
+               newhead.tptr.ptr = ((odp_buffer_hdr_t *)oldhead.tptr.ptr)->next;
+               newhead.tptr.tag = tag + 1;
        } while (odp_cs(pool->buf_freelist, oldhead, newhead) == 0);
 
-       if (odp_unlikely(myhead == NULL)) {
+       if (odp_unlikely(oldhead.tptr.ptr == NULL)) {
                odp_atomic_inc_u64(&pool->bufempty);
        } else {
                uint64_t bufcount =
                        odp_atomic_fetch_sub_u32(&pool->bufcount, 1) - 1;
+               mybuf = oldhead.tptr.ptr;
 
                /* Check for low watermark condition */
                if (bufcount == pool->low_wm && !pool->low_wm_assert) {
@@ -226,16 +215,16 @@ static inline odp_buffer_hdr_t *get_buf(struct 
pool_entry_s *pool)
                }
 
                odp_atomic_inc_u64(&pool->bufallocs);
-               myhead->next = myhead;  /* Mark buffer allocated */
-               myhead->allocator = odp_thread_id();
+               mybuf->next = mybuf;  /* Mark buffer allocated */
+               mybuf->allocator = odp_thread_id();
        }
 
-       return (void *)myhead;
+       return (void *)mybuf;
 }
 
 static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t *buf)
 {
-       odp_buffer_hdr_t *oldhead, *myhead, *mybuf;
+       _odp_atomic_ptr_t oldhead, newhead;
 
        buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
 
@@ -252,11 +241,11 @@ static inline void ret_buf(struct pool_entry_s *pool, 
odp_buffer_hdr_t *buf)
        oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
 
        do {
-               size_t tag = odp_tag(oldhead);
-               myhead = odp_detag(oldhead);
-               buf->next = myhead;
-               mybuf = odp_retag(buf, tag + 1);
-       } while (odp_cs(pool->buf_freelist, oldhead, mybuf) == 0);
+               uint64_t tag = oldhead.tptr.tag;
+               buf->next = oldhead.tptr.ptr;
+               newhead.tptr.ptr = buf;
+               newhead.tptr.tag = tag + 1;
+       } while (odp_cs(pool->buf_freelist, oldhead, newhead) == 0);
 
        uint64_t bufcount = odp_atomic_fetch_add_u32(&pool->bufcount, 1) + 1;
 
diff --git a/platform/linux-generic/odp_buffer_pool.c 
b/platform/linux-generic/odp_buffer_pool.c
index eedb380..5ff844a 100644
--- a/platform/linux-generic/odp_buffer_pool.c
+++ b/platform/linux-generic/odp_buffer_pool.c
@@ -286,10 +286,8 @@ odp_buffer_pool_t odp_buffer_pool_create(const char *name,
                pool->s.pool_mdata_addr = mdata_base_addr;
 
                pool->s.buf_stride = buf_stride;
-               _odp_atomic_ptr_store(&pool->s.buf_freelist, NULL,
-                                     _ODP_MEMMODEL_RLX);
-               _odp_atomic_ptr_store(&pool->s.blk_freelist, NULL,
-                                     _ODP_MEMMODEL_RLX);
+               _odp_atomic_ptr_init(&pool->s.buf_freelist, NULL);
+               _odp_atomic_ptr_init(&pool->s.blk_freelist, NULL);
 
                /* Initialization will increment these to their target vals */
                odp_atomic_store_u32(&pool->s.bufcount, 0);
-- 
1.9.1



_______________________________________________
lng-odp mailing list
[email protected]
http://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to