On 06/19/17 22:12, Brian Brooks wrote:
Signed-off-by: Ola Liljedahl <ola.liljed...@arm.com>
Reviewed-by: Brian Brooks <brian.bro...@arm.com>
---
  platform/linux-generic/Makefile.am           |   1 +
  platform/linux-generic/include/odp_llqueue.h | 309 +++++++++++++++++++++++++++
  2 files changed, 310 insertions(+)
  create mode 100644 platform/linux-generic/include/odp_llqueue.h

diff --git a/platform/linux-generic/Makefile.am 
b/platform/linux-generic/Makefile.am
index b869fd4b..90cc4ca6 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -157,6 +157,7 @@ noinst_HEADERS = \
                  ${srcdir}/include/odp_errno_define.h \
                  ${srcdir}/include/odp_forward_typedefs_internal.h \
                  ${srcdir}/include/odp_internal.h \
+                 ${srcdir}/include/odp_llqueue.h \
                  ${srcdir}/include/odp_name_table_internal.h \
                  ${srcdir}/include/odp_packet_internal.h \
                  ${srcdir}/include/odp_packet_io_internal.h \
diff --git a/platform/linux-generic/include/odp_llqueue.h 
b/platform/linux-generic/include/odp_llqueue.h
new file mode 100644
index 00000000..758af490
--- /dev/null
+++ b/platform/linux-generic/include/odp_llqueue.h
@@ -0,0 +1,309 @@
+/* Copyright (c) 2017, ARM Limited.
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:        BSD-3-Clause
+ */
+
+#ifndef ODP_LLQUEUE_H_
+#define ODP_LLQUEUE_H_
+
+#include <odp/api/cpu.h>
+#include <odp/api/hints.h>
+#include <odp/api/spinlock.h>
+
+#include <odp_config_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_cpu.h>
+
+#include <stdint.h>
+#include <stdlib.h>
+
+/******************************************************************************
+ * Linked list queues
+ *****************************************************************************/
+
+struct llqueue;
+struct llnode;
+
+static struct llnode *llq_head(struct llqueue *llq);
+static void llqueue_init(struct llqueue *llq);
+static void llq_enqueue(struct llqueue *llq, struct llnode *node);
+static struct llnode *llq_dequeue(struct llqueue *llq);
+static odp_bool_t llq_dequeue_cond(struct llqueue *llq, struct llnode *exp);
+static odp_bool_t llq_cond_rotate(struct llqueue *llq, struct llnode *node);
+static odp_bool_t llq_on_queue(struct llnode *node);
+
+/******************************************************************************
+ * The implementation(s)
+ *****************************************************************************/
+
+#define SENTINEL ((void *)~(uintptr_t)0)
+
+#ifdef CONFIG_LLDSCD
+/* Implement queue operations using double-word LL/SC */
+
+/* The scalar equivalent of a double pointer */
+#if __SIZEOF_PTRDIFF_T__ == 4
+typedef uint64_t dintptr_t;
+#endif
+#if __SIZEOF_PTRDIFF_T__ == 8
+typedef __int128 dintptr_t;
+#endif
+
+struct llnode {
+       struct llnode *next;
+};
+
+union llht {
+       struct {
+               struct llnode *head, *tail;
+       } st;
+       dintptr_t ui;

you use dintptr_t only once. It's here. No need for typedef. Remove ifdef above. Place it here and change to #else if. And code will be 2 times smaller.

+};
+
+struct llqueue {
+       union llht u;
+};

that looks not clear for me. 'struct llqueue' and 'union llht' is the same thing. After that you reference to it that as 'struct llqueue' in function arguments and 'union llht' inside the function. Like bellow...

+
+static inline struct llnode *llq_head(struct llqueue *llq)
+{
+       return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED);
+}
+
+static inline void llqueue_init(struct llqueue *llq)
+{
+       llq->u.st.head = NULL;
+       llq->u.st.tail = NULL;
+}
+
+static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
+{
+       union llht old, neu;
+

here.

btw neu has to be new, right?

+       ODP_ASSERT(node->next == NULL);
+       node->next = SENTINEL;
+       do {
+               old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+               neu.st.head = old.st.head == NULL ? node : old.st.head;
+               neu.st.tail = node;
+       } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE)));
+       if (old.st.tail != NULL) {
+               /* List was not empty */
+               ODP_ASSERT(old.st.tail->next == SENTINEL);
+               old.st.tail->next = node;
+       }
+}
+
+static inline struct llnode *llq_dequeue(struct llqueue *llq)
+{
+       struct llnode *head;
+       union llht old, neu;
+
+       /* llq_dequeue() may be used in a busy-waiting fashion
+        * Read head using plain load to avoid disturbing remote LL/SC
+        */
+       head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE);
+       if (head == NULL)
+               return NULL;
+       /* Read head->next before LL to minimize cache miss latency
+        * in LL/SC below
+        */
+       (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED);
+
+       do {
+               old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+               if (odp_unlikely(old.st.head == NULL)) {
+                       /* Empty list */
+                       return NULL;
+               } else if (odp_unlikely(old.st.head == old.st.tail)) {
+                       /* Single-element in list */
+                       neu.st.head = NULL;
+                       neu.st.tail = NULL;
+               } else {
+                       /* Multi-element list, dequeue head */
+                       struct llnode *next;
+                       /* Wait until llq_enqueue() has written true next
+                        * pointer
+                        */
+                       while ((next = __atomic_load_n(&old.st.head->next,
+                                                      __ATOMIC_RELAXED)) ==
+                               SENTINEL)
+                               odp_cpu_pause();
+                       neu.st.head = next;
+                       neu.st.tail = old.st.tail;
+               }
+       } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
+       old.st.head->next = NULL;
+       return old.st.head;
+}
+
+static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
+                                         struct llnode *exp)
+{
+       union llht old, neu;
+
+       do {
+               old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE);
+               if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) {
+                       /* Empty list or wrong head */
+                       return false;
+               } else if (odp_unlikely(old.st.head == old.st.tail)) {
+                       /* Single-element in list */
+                       neu.st.head = NULL;
+                       neu.st.tail = NULL;
+               } else {
+                       /* Multi-element list, dequeue head */
+                       struct llnode *next;
+
+                       /* Wait until llq_enqueue() has written true next
+                        * pointer */
+                       while ((next = __atomic_load_n(&old.st.head->next,
+                                                      __ATOMIC_RELAXED)) ==
+                               SENTINEL)
+                               odp_cpu_pause();
+
+                       neu.st.head = next;
+                       neu.st.tail = old.st.tail;
+               }
+       } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
+       old.st.head->next = NULL;
+       return true;
+}
+
+/* If 'node' is a head of llq then move it to tail */
+static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
+                                        struct llnode *node)
+{
+       /* Difficult to make this into a single atomic operation
+        * Instead use existing primitives.
+        */
+       if (odp_likely(llq_dequeue_cond(llq, node))) {
+               llq_enqueue(llq, node);
+               return true;
+       }
+       return false;
+}
+
+static inline odp_bool_t llq_on_queue(struct llnode *node)
+{
+       return node->next != NULL;
+}
+
+#else


can chunk bellow be in separate file? I think it was on some earlier comments.

+/* Implement queue operations protected by a spin lock */
+
+struct llnode {
+       struct llnode *next;
+};
+
+struct llqueue {
+       struct llnode *head, *tail;
+       odp_spinlock_t lock;
+};
+
+static inline struct llnode *llq_head(struct llqueue *llq)
+{
+       return __atomic_load_n(&llq->head, __ATOMIC_RELAXED);
+}
+
+static inline void llqueue_init(struct llqueue *llq)
+{
+       llq->head = NULL;
+       llq->tail = NULL;
+       odp_spinlock_init(&llq->lock);
+}
+
+static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
+{
+       ODP_ASSERT(node->next == NULL);
+       node->next = SENTINEL;
+
+       odp_spinlock_lock(&llq->lock);
+       if (llq->head == NULL) {
+               llq->head = node;
+               llq->tail = node;
+       } else {
+               llq->tail->next = node;
+               llq->tail = node;
+       }
+       odp_spinlock_unlock(&llq->lock);
+}
+
+static inline struct llnode *llq_dequeue(struct llqueue *llq)
+{
+       struct llnode *head;
+       struct llnode *node = NULL;
+
+       head = __atomic_load_n(&llq->head, __ATOMIC_RELAXED);
+       if (head == NULL)
+               return NULL;
+
+       odp_spinlock_lock(&llq->lock);
+       if (llq->head != NULL) {
+               node = llq->head;
+               if (llq->head == llq->tail) {
+                       ODP_ASSERT(node->next == SENTINEL);
+                       llq->head = NULL;
+                       llq->tail = NULL;
+               } else {
+                       ODP_ASSERT(node->next != SENTINEL);
+                       llq->head = node->next;
+               }
+               node->next = NULL;
+       }
+       odp_spinlock_unlock(&llq->lock);
+       return node;
+}
+
+static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
+                                         struct llnode *node)
+{
+       odp_bool_t success = false;
+
+       odp_spinlock_lock(&llq->lock);
+       if (odp_likely(llq->head != NULL && llq->head == node)) {
+               success = true;
+               if (llq->head == llq->tail) {
+                       ODP_ASSERT(node->next == SENTINEL);
+                       llq->head = NULL;
+                       llq->tail = NULL;
+               } else {
+                       ODP_ASSERT(node->next != SENTINEL);
+                       llq->head = node->next;
+               }
+               node->next = NULL;
+       }
+       odp_spinlock_unlock(&llq->lock);
+       return success;
+}
+
+/* If 'node' is a head of llq then move it to tail */
+static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
+                                        struct llnode *node)
+{
+       odp_bool_t success = false;
+
+       odp_spinlock_lock(&llq->lock);
+       if (odp_likely(llq->head == node)) {
+               success = true;
+               if (llq->tail != node) {
+                       ODP_ASSERT(node->next != SENTINEL);
+                       llq->head = node->next;
+                       llq->tail->next = node;
+                       llq->tail = node;
+                       node->next = SENTINEL;
+               }
+               /* Else 'node' is only element on list => nothing to do */
+       }
+       odp_spinlock_unlock(&llq->lock);
+       return success;
+}
+
+static inline odp_bool_t llq_on_queue(struct llnode *node)
+{
+       return node->next != NULL;
+}

that is common function, has to be out of ifdef.

Maxim.

+
+#endif
+
+#endif


Reply via email to