On 5 April 2017 at 14:20, Maxim Uvarov <[email protected]> wrote:
> On 04/05/17 01:46, Ola Liljedahl wrote:
>> On 4 April 2017 at 21:25, Maxim Uvarov <[email protected]> wrote:
>>> it's better to have 2 separate files for that. One for ODP_CONFIG_LLDSCD
>> "better"? In what way?
Please respond to the question. If you claim something is "better",
you must be able to explain *why* it is better.

*We* have explained why we think it is better to keep both
implementations in the same file, close to each other. I think Brian's
explanation was very good.

>>
>>> defined and one for not. Also ODP_ prefix should not be used for
>>> internal things (not api).
>> OK this was not clear, some of the defines in odp_config_internal.h
>> use an ODP_ prefix, some not. You mean there is a system to that?
>>
>> Shouldn't those defines that are part of the API be declared/described
>> in the API header files (located in include/odp/api/spec)? How else do
>> you know that they are part of the API? And if they are part of the
>> API, how does the application (the 'A' in API) access the definitions
>> *and* their values?
>>
>> There are API's for querying about things like total number of queues
>> but those API's are separate and do not depend on some define with a
>> specific name.
>>
>
> That is not api setting. It's linux-generic internal settings. ODP apps
> do not use that values.
>
> Maxim.
>
>>>
>>> Maxim.
>>>
>>> On 04/04/17 21:48, Brian Brooks wrote:
>>>> Signed-off-by: Ola Liljedahl <[email protected]>
>>>> Reviewed-by: Brian Brooks <[email protected]>
>>>> ---
>>>>  platform/linux-generic/include/odp_llqueue.h | 285 
>>>> +++++++++++++++++++++++++++
>>>>  1 file changed, 285 insertions(+)
>>>>  create mode 100644 platform/linux-generic/include/odp_llqueue.h
>>>>
>>>> diff --git a/platform/linux-generic/include/odp_llqueue.h 
>>>> b/platform/linux-generic/include/odp_llqueue.h
>>>> new file mode 100644
>>>> index 00000000..aa46ace3
>>>> --- /dev/null
>>>> +++ b/platform/linux-generic/include/odp_llqueue.h
>>>> @@ -0,0 +1,285 @@
>>>> +/* Copyright (c) 2017, ARM Limited.
>>>> + * All rights reserved.
>>>> + *
>>>> + * SPDX-License-Identifier:        BSD-3-Clause
>>>> + */
>>>> +
>>>> +#ifndef ODP_LLQUEUE_H_
>>>> +#define ODP_LLQUEUE_H_
>>>> +
>>>> +#include <odp/api/cpu.h>
>>>> +#include <odp/api/hints.h>
>>>> +#include <odp/api/spinlock.h>
>>>> +
>>>> +#include <odp_config_internal.h>
>>>> +#include <odp_debug_internal.h>
>>>> +#include <odp_llsc.h>
>>>> +
>>>> +#include <stdint.h>
>>>> +#include <stdlib.h>
>>>> +
>>>> +/******************************************************************************
>>>> + * Linked list queues
>>>> + 
>>>> *****************************************************************************/
>>>> +
>>>> +/* The scalar equivalent of a double pointer */
>>>> +#if __SIZEOF_PTRDIFF_T__ == 4
>>>> +typedef uint64_t dintptr_t;
>>>> +#endif
>>>> +#if __SIZEOF_PTRDIFF_T__ == 8
>>>> +typedef __int128 dintptr_t;
>>>> +#endif
>>>> +
>>>> +#define SENTINEL ((void *)~(uintptr_t)0)
>>>> +
>>>> +struct llnode {
>>>> +     struct llnode *next;
>>>> +};
>>>> +
>>>> +union llht {
>>>> +     struct {
>>>> +             struct llnode *head, *tail;
>>>> +     } st;
>>>> +     dintptr_t ui;
>>>> +};
>>>> +
>>>> +struct llqueue {
>>>> +     union llht u;
>>>> +#ifndef ODP_CONFIG_LLDSCD
>>>> +     odp_spinlock_t lock;
>>>> +#endif
>>>> +};
>>>> +
>>>> +static inline struct llnode *llq_head(struct llqueue *llq)
>>>> +{
>>>> +     return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED);
>>>> +}
>>>> +
>>>> +static inline void llqueue_init(struct llqueue *llq)
>>>> +{
>>>> +     llq->u.st.head = NULL;
>>>> +     llq->u.st.tail = NULL;
>>>> +#ifndef ODP_CONFIG_LLDSCD
>>>> +     odp_spinlock_init(&llq->lock);
>>>> +#endif
>>>> +}
>>>> +
>>>> +#ifdef ODP_CONFIG_LLDSCD
>>>> +
>>>> +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
>>>> +{
>>>> +     union llht old, neu;
>>>> +
>>>> +     ODP_ASSERT(node->next == NULL);
>>>> +     node->next = SENTINEL;
>>>> +     do {
>>>> +             old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
>>>> +             neu.st.head = old.st.head == NULL ? node : old.st.head;
>>>> +             neu.st.tail = node;
>>>> +     } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE)));
>>>> +     if (old.st.tail != NULL) {
>>>> +             /* List was not empty */
>>>> +             ODP_ASSERT(old.st.tail->next == SENTINEL);
>>>> +             old.st.tail->next = node;
>>>> +     }
>>>> +}
>>>> +
>>>> +#else
>>>> +
>>>> +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
>>>> +{
>>>> +     ODP_ASSERT(node->next == NULL);
>>>> +     node->next = SENTINEL;
>>>> +
>>>> +     odp_spinlock_lock(&llq->lock);
>>>> +     if (llq->u.st.head == NULL) {
>>>> +             llq->u.st.head = node;
>>>> +             llq->u.st.tail = node;
>>>> +     } else {
>>>> +             llq->u.st.tail->next = node;
>>>> +             llq->u.st.tail = node;
>>>> +     }
>>>> +     odp_spinlock_unlock(&llq->lock);
>>>> +}
>>>> +
>>>> +#endif
>>>> +
>>>> +#ifdef ODP_CONFIG_LLDSCD
>>>> +
>>>> +static inline struct llnode *llq_dequeue(struct llqueue *llq)
>>>> +{
>>>> +     struct llnode *head;
>>>> +     union llht old, neu;
>>>> +
>>>> +     /* llq_dequeue() may be used in a busy-waiting fashion
>>>> +      * Read head using plain load to avoid disturbing remote LL/SC
>>>> +      */
>>>> +     head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE);
>>>> +     if (head == NULL)
>>>> +             return NULL;
>>>> +     /* Read head->next before LL to minimize cache miss latency
>>>> +      * in LL/SC below
>>>> +      */
>>>> +     (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED);
>>>> +
>>>> +     do {
>>>> +             old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
>>>> +             if (odp_unlikely(old.st.head == NULL)) {
>>>> +                     /* Empty list */
>>>> +                     return NULL;
>>>> +             } else if (odp_unlikely(old.st.head == old.st.tail)) {
>>>> +                     /* Single-element in list */
>>>> +                     neu.st.head = NULL;
>>>> +                     neu.st.tail = NULL;
>>>> +             } else {
>>>> +                     /* Multi-element list, dequeue head */
>>>> +                     struct llnode *next;
>>>> +                     /* Wait until llq_enqueue() has written true next
>>>> +                      * pointer
>>>> +                      */
>>>> +                     while ((next = __atomic_load_n(&old.st.head->next,
>>>> +                                                    __ATOMIC_RELAXED)) ==
>>>> +                             SENTINEL)
>>>> +                             odp_cpu_pause();
>>>> +                     neu.st.head = next;
>>>> +                     neu.st.tail = old.st.tail;
>>>> +             }
>>>> +     } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
>>>> +     old.st.head->next = NULL;
>>>> +     return old.st.head;
>>>> +}
>>>> +
>>>> +#else
>>>> +
>>>> +static inline struct llnode *llq_dequeue(struct llqueue *llq)
>>>> +{
>>>> +     struct llnode *head;
>>>> +     struct llnode *node = NULL;
>>>> +
>>>> +     head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED);
>>>> +     if (head == NULL)
>>>> +             return NULL;
>>>> +
>>>> +     odp_spinlock_lock(&llq->lock);
>>>> +     if (llq->u.st.head != NULL) {
>>>> +             node = llq->u.st.head;
>>>> +             if (llq->u.st.head == llq->u.st.tail) {
>>>> +                     ODP_ASSERT(node->next == SENTINEL);
>>>> +                     llq->u.st.head = NULL;
>>>> +                     llq->u.st.tail = NULL;
>>>> +             } else {
>>>> +                     ODP_ASSERT(node->next != SENTINEL);
>>>> +                     llq->u.st.head = node->next;
>>>> +             }
>>>> +             node->next = NULL;
>>>> +     }
>>>> +     odp_spinlock_unlock(&llq->lock);
>>>> +     return node;
>>>> +}
>>>> +
>>>> +#endif
>>>> +
>>>> +#ifdef ODP_CONFIG_LLDSCD
>>>> +
>>>> +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
>>>> +                                       struct llnode *exp)
>>>> +{
>>>> +     union llht old, neu;
>>>> +
>>>> +     do {
>>>> +             old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE);
>>>> +             if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) 
>>>> {
>>>> +                     /* Empty list or wrong head */
>>>> +                     return false;
>>>> +             } else if (odp_unlikely(old.st.head == old.st.tail)) {
>>>> +                     /* Single-element in list */
>>>> +                     neu.st.head = NULL;
>>>> +                     neu.st.tail = NULL;
>>>> +             } else {
>>>> +                     /* Multi-element list, dequeue head */
>>>> +                     struct llnode *next;
>>>> +
>>>> +                     /* Wait until llq_enqueue() has written true next
>>>> +                      * pointer */
>>>> +                     while ((next = __atomic_load_n(&old.st.head->next,
>>>> +                                                    __ATOMIC_RELAXED)) ==
>>>> +                             SENTINEL)
>>>> +                             odp_cpu_pause();
>>>> +
>>>> +                     neu.st.head = next;
>>>> +                     neu.st.tail = old.st.tail;
>>>> +             }
>>>> +     } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
>>>> +     old.st.head->next = NULL;
>>>> +     return true;
>>>> +}
>>>> +
>>>> +#else
>>>> +
>>>> +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
>>>> +                                       struct llnode *node)
>>>> +{
>>>> +     odp_bool_t success = false;
>>>> +
>>>> +     odp_spinlock_lock(&llq->lock);
>>>> +     if (odp_likely(llq->u.st.head != NULL && llq->u.st.head == node)) {
>>>> +             success = true;
>>>> +             if (llq->u.st.head == llq->u.st.tail) {
>>>> +                     ODP_ASSERT(node->next == SENTINEL);
>>>> +                     llq->u.st.head = NULL;
>>>> +                     llq->u.st.tail = NULL;
>>>> +             } else {
>>>> +                     ODP_ASSERT(node->next != SENTINEL);
>>>> +                     llq->u.st.head = node->next;
>>>> +             }
>>>> +             node->next = NULL;
>>>> +     }
>>>> +     odp_spinlock_unlock(&llq->lock);
>>>> +     return success;
>>>> +}
>>>> +
>>>> +#endif
>>>> +
>>>> +#ifdef ODP_CONFIG_LLDSCD
>>>> +
>>>> +/* If 'node' is a head of llq then move it to tail */
>>>> +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
>>>> +                                      struct llnode *node)
>>>> +{
>>>> +     /* Difficult to make this into a single atomic operation
>>>> +      * Instead use existing primitives.
>>>> +      */
>>>> +     if (odp_likely(llq_dequeue_cond(llq, node))) {
>>>> +             llq_enqueue(llq, node);
>>>> +             return true;
>>>> +     }
>>>> +     return false;
>>>> +}
>>>> +
>>>> +#else
>>>> +
>>>> +/* If 'node' is a head of llq then move it to tail */
>>>> +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
>>>> +                                      struct llnode *node)
>>>> +{
>>>> +     odp_bool_t success = false;
>>>> +
>>>> +     odp_spinlock_lock(&llq->lock);
>>>> +     if (odp_likely(llq->u.st.head == node)) {
>>>> +             success = true;
>>>> +             if (llq->u.st.tail != node) {
>>>> +                     ODP_ASSERT(node->next != SENTINEL);
>>>> +                     llq->u.st.head = node->next;
>>>> +                     llq->u.st.tail->next = node;
>>>> +                     llq->u.st.tail = node;
>>>> +                     node->next = SENTINEL;
>>>> +             }
>>>> +             /* Else 'node' is only element on list => nothing to do */
>>>> +     }
>>>> +     odp_spinlock_unlock(&llq->lock);
>>>> +     return success;
>>>> +}
>>>> +
>>>> +#endif
>>>> +
>>>> +#endif
>>>>
>>>
>

Reply via email to