On 5 April 2017 at 14:20, Maxim Uvarov <[email protected]> wrote: > On 04/05/17 01:46, Ola Liljedahl wrote: >> On 4 April 2017 at 21:25, Maxim Uvarov <[email protected]> wrote: >>> it's better to have 2 separate files for that. One for ODP_CONFIG_LLDSCD >> "better"? In what way? Please respond to the question. If you claim something is "better", you must be able to explain *why* it is better.
*We* have explained why we think it is better to keep both implementations in the same file, close to each other. I think Brian's explanation was very good. >> >>> defined and one for not. Also ODP_ prefix should not be used for >>> internal things (not api). >> OK this was not clear, some of the defines in odp_config_internal.h >> use an ODP_ prefix, some not. You mean there is a system to that? >> >> Shouldn't those defines that are part of the API be declared/described >> in the API header files (located in include/odp/api/spec)? How else do >> you know that they are part of the API? And if they are part of the >> API, how does the application (the 'A' in API) access the definitions >> *and* their values? >> >> There are API's for querying about things like total number of queues >> but those API's are separate and do not depend on some define with a >> specific name. >> > > That is not api setting. It's linux-generic internal settings. ODP apps > do not use that values. > > Maxim. > >>> >>> Maxim. >>> >>> On 04/04/17 21:48, Brian Brooks wrote: >>>> Signed-off-by: Ola Liljedahl <[email protected]> >>>> Reviewed-by: Brian Brooks <[email protected]> >>>> --- >>>> platform/linux-generic/include/odp_llqueue.h | 285 >>>> +++++++++++++++++++++++++++ >>>> 1 file changed, 285 insertions(+) >>>> create mode 100644 platform/linux-generic/include/odp_llqueue.h >>>> >>>> diff --git a/platform/linux-generic/include/odp_llqueue.h >>>> b/platform/linux-generic/include/odp_llqueue.h >>>> new file mode 100644 >>>> index 00000000..aa46ace3 >>>> --- /dev/null >>>> +++ b/platform/linux-generic/include/odp_llqueue.h >>>> @@ -0,0 +1,285 @@ >>>> +/* Copyright (c) 2017, ARM Limited. >>>> + * All rights reserved. >>>> + * >>>> + * SPDX-License-Identifier: BSD-3-Clause >>>> + */ >>>> + >>>> +#ifndef ODP_LLQUEUE_H_ >>>> +#define ODP_LLQUEUE_H_ >>>> + >>>> +#include <odp/api/cpu.h> >>>> +#include <odp/api/hints.h> >>>> +#include <odp/api/spinlock.h> >>>> + >>>> +#include <odp_config_internal.h> >>>> +#include <odp_debug_internal.h> >>>> +#include <odp_llsc.h> >>>> + >>>> +#include <stdint.h> >>>> +#include <stdlib.h> >>>> + >>>> +/****************************************************************************** >>>> + * Linked list queues >>>> + >>>> *****************************************************************************/ >>>> + >>>> +/* The scalar equivalent of a double pointer */ >>>> +#if __SIZEOF_PTRDIFF_T__ == 4 >>>> +typedef uint64_t dintptr_t; >>>> +#endif >>>> +#if __SIZEOF_PTRDIFF_T__ == 8 >>>> +typedef __int128 dintptr_t; >>>> +#endif >>>> + >>>> +#define SENTINEL ((void *)~(uintptr_t)0) >>>> + >>>> +struct llnode { >>>> + struct llnode *next; >>>> +}; >>>> + >>>> +union llht { >>>> + struct { >>>> + struct llnode *head, *tail; >>>> + } st; >>>> + dintptr_t ui; >>>> +}; >>>> + >>>> +struct llqueue { >>>> + union llht u; >>>> +#ifndef ODP_CONFIG_LLDSCD >>>> + odp_spinlock_t lock; >>>> +#endif >>>> +}; >>>> + >>>> +static inline struct llnode *llq_head(struct llqueue *llq) >>>> +{ >>>> + return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED); >>>> +} >>>> + >>>> +static inline void llqueue_init(struct llqueue *llq) >>>> +{ >>>> + llq->u.st.head = NULL; >>>> + llq->u.st.tail = NULL; >>>> +#ifndef ODP_CONFIG_LLDSCD >>>> + odp_spinlock_init(&llq->lock); >>>> +#endif >>>> +} >>>> + >>>> +#ifdef ODP_CONFIG_LLDSCD >>>> + >>>> +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) >>>> +{ >>>> + union llht old, neu; >>>> + >>>> + ODP_ASSERT(node->next == NULL); >>>> + node->next = SENTINEL; >>>> + do { >>>> + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); >>>> + neu.st.head = old.st.head == NULL ? node : old.st.head; >>>> + neu.st.tail = node; >>>> + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE))); >>>> + if (old.st.tail != NULL) { >>>> + /* List was not empty */ >>>> + ODP_ASSERT(old.st.tail->next == SENTINEL); >>>> + old.st.tail->next = node; >>>> + } >>>> +} >>>> + >>>> +#else >>>> + >>>> +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) >>>> +{ >>>> + ODP_ASSERT(node->next == NULL); >>>> + node->next = SENTINEL; >>>> + >>>> + odp_spinlock_lock(&llq->lock); >>>> + if (llq->u.st.head == NULL) { >>>> + llq->u.st.head = node; >>>> + llq->u.st.tail = node; >>>> + } else { >>>> + llq->u.st.tail->next = node; >>>> + llq->u.st.tail = node; >>>> + } >>>> + odp_spinlock_unlock(&llq->lock); >>>> +} >>>> + >>>> +#endif >>>> + >>>> +#ifdef ODP_CONFIG_LLDSCD >>>> + >>>> +static inline struct llnode *llq_dequeue(struct llqueue *llq) >>>> +{ >>>> + struct llnode *head; >>>> + union llht old, neu; >>>> + >>>> + /* llq_dequeue() may be used in a busy-waiting fashion >>>> + * Read head using plain load to avoid disturbing remote LL/SC >>>> + */ >>>> + head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE); >>>> + if (head == NULL) >>>> + return NULL; >>>> + /* Read head->next before LL to minimize cache miss latency >>>> + * in LL/SC below >>>> + */ >>>> + (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED); >>>> + >>>> + do { >>>> + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); >>>> + if (odp_unlikely(old.st.head == NULL)) { >>>> + /* Empty list */ >>>> + return NULL; >>>> + } else if (odp_unlikely(old.st.head == old.st.tail)) { >>>> + /* Single-element in list */ >>>> + neu.st.head = NULL; >>>> + neu.st.tail = NULL; >>>> + } else { >>>> + /* Multi-element list, dequeue head */ >>>> + struct llnode *next; >>>> + /* Wait until llq_enqueue() has written true next >>>> + * pointer >>>> + */ >>>> + while ((next = __atomic_load_n(&old.st.head->next, >>>> + __ATOMIC_RELAXED)) == >>>> + SENTINEL) >>>> + odp_cpu_pause(); >>>> + neu.st.head = next; >>>> + neu.st.tail = old.st.tail; >>>> + } >>>> + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); >>>> + old.st.head->next = NULL; >>>> + return old.st.head; >>>> +} >>>> + >>>> +#else >>>> + >>>> +static inline struct llnode *llq_dequeue(struct llqueue *llq) >>>> +{ >>>> + struct llnode *head; >>>> + struct llnode *node = NULL; >>>> + >>>> + head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED); >>>> + if (head == NULL) >>>> + return NULL; >>>> + >>>> + odp_spinlock_lock(&llq->lock); >>>> + if (llq->u.st.head != NULL) { >>>> + node = llq->u.st.head; >>>> + if (llq->u.st.head == llq->u.st.tail) { >>>> + ODP_ASSERT(node->next == SENTINEL); >>>> + llq->u.st.head = NULL; >>>> + llq->u.st.tail = NULL; >>>> + } else { >>>> + ODP_ASSERT(node->next != SENTINEL); >>>> + llq->u.st.head = node->next; >>>> + } >>>> + node->next = NULL; >>>> + } >>>> + odp_spinlock_unlock(&llq->lock); >>>> + return node; >>>> +} >>>> + >>>> +#endif >>>> + >>>> +#ifdef ODP_CONFIG_LLDSCD >>>> + >>>> +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, >>>> + struct llnode *exp) >>>> +{ >>>> + union llht old, neu; >>>> + >>>> + do { >>>> + old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE); >>>> + if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) >>>> { >>>> + /* Empty list or wrong head */ >>>> + return false; >>>> + } else if (odp_unlikely(old.st.head == old.st.tail)) { >>>> + /* Single-element in list */ >>>> + neu.st.head = NULL; >>>> + neu.st.tail = NULL; >>>> + } else { >>>> + /* Multi-element list, dequeue head */ >>>> + struct llnode *next; >>>> + >>>> + /* Wait until llq_enqueue() has written true next >>>> + * pointer */ >>>> + while ((next = __atomic_load_n(&old.st.head->next, >>>> + __ATOMIC_RELAXED)) == >>>> + SENTINEL) >>>> + odp_cpu_pause(); >>>> + >>>> + neu.st.head = next; >>>> + neu.st.tail = old.st.tail; >>>> + } >>>> + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); >>>> + old.st.head->next = NULL; >>>> + return true; >>>> +} >>>> + >>>> +#else >>>> + >>>> +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, >>>> + struct llnode *node) >>>> +{ >>>> + odp_bool_t success = false; >>>> + >>>> + odp_spinlock_lock(&llq->lock); >>>> + if (odp_likely(llq->u.st.head != NULL && llq->u.st.head == node)) { >>>> + success = true; >>>> + if (llq->u.st.head == llq->u.st.tail) { >>>> + ODP_ASSERT(node->next == SENTINEL); >>>> + llq->u.st.head = NULL; >>>> + llq->u.st.tail = NULL; >>>> + } else { >>>> + ODP_ASSERT(node->next != SENTINEL); >>>> + llq->u.st.head = node->next; >>>> + } >>>> + node->next = NULL; >>>> + } >>>> + odp_spinlock_unlock(&llq->lock); >>>> + return success; >>>> +} >>>> + >>>> +#endif >>>> + >>>> +#ifdef ODP_CONFIG_LLDSCD >>>> + >>>> +/* If 'node' is a head of llq then move it to tail */ >>>> +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, >>>> + struct llnode *node) >>>> +{ >>>> + /* Difficult to make this into a single atomic operation >>>> + * Instead use existing primitives. >>>> + */ >>>> + if (odp_likely(llq_dequeue_cond(llq, node))) { >>>> + llq_enqueue(llq, node); >>>> + return true; >>>> + } >>>> + return false; >>>> +} >>>> + >>>> +#else >>>> + >>>> +/* If 'node' is a head of llq then move it to tail */ >>>> +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, >>>> + struct llnode *node) >>>> +{ >>>> + odp_bool_t success = false; >>>> + >>>> + odp_spinlock_lock(&llq->lock); >>>> + if (odp_likely(llq->u.st.head == node)) { >>>> + success = true; >>>> + if (llq->u.st.tail != node) { >>>> + ODP_ASSERT(node->next != SENTINEL); >>>> + llq->u.st.head = node->next; >>>> + llq->u.st.tail->next = node; >>>> + llq->u.st.tail = node; >>>> + node->next = SENTINEL; >>>> + } >>>> + /* Else 'node' is only element on list => nothing to do */ >>>> + } >>>> + odp_spinlock_unlock(&llq->lock); >>>> + return success; >>>> +} >>>> + >>>> +#endif >>>> + >>>> +#endif >>>> >>> >
