On 10/19/16 15:09, Petri Savolainen wrote:
Moved scheduler ring code into a new header file, so that
it can be used also in other parts of the implementation.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
  platform/linux-generic/Makefile.am                 |   1 +
  platform/linux-generic/include/odp_ring_internal.h | 109 +++++++++++++++++++++
  platform/linux-generic/odp_schedule.c              | 102 ++-----------------
  3 files changed, 118 insertions(+), 94 deletions(-)
  create mode 100644 platform/linux-generic/include/odp_ring_internal.h

diff --git a/platform/linux-generic/Makefile.am 
b/platform/linux-generic/Makefile.am
index 0fba393..63f9ce7 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -121,6 +121,7 @@ noinst_HEADERS = \
                  ${srcdir}/include/odp_pool_internal.h \
                  ${srcdir}/include/odp_posix_extensions.h \
                  ${srcdir}/include/odp_queue_internal.h \
+                 ${srcdir}/include/odp_ring_internal.h \
                  ${srcdir}/include/odp_schedule_if.h \
                  ${srcdir}/include/odp_schedule_internal.h \
                  ${srcdir}/include/odp_schedule_ordered_internal.h \
diff --git a/platform/linux-generic/include/odp_ring_internal.h 
b/platform/linux-generic/include/odp_ring_internal.h
new file mode 100644
index 0000000..c89b298
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -0,0 +1,109 @@
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_RING_INTERNAL_H_
+#define ODP_RING_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/atomic.h>
+#include <odp/api/hints.h>
+#include <odp_align_internal.h>
+
+/* Ring empty, not a valid data value. */
+#define RING_EMPTY ((uint32_t)-1)
+
+/* Ring of uint32_t data
+ *
+ * Ring stores head and tail counters. Ring indexes are formed from these
+ * counters with a mask (mask = ring_size - 1), which requires that ring size
+ * must be a power of two. */
+typedef struct {
+       /* Writer head and tail */
+       odp_atomic_u32_t w_head;
+       odp_atomic_u32_t w_tail;
+       uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
+
+       /* Reader head and tail */
+       odp_atomic_u32_t r_head;
+       odp_atomic_u32_t r_tail;
+
+       uint32_t data[0];
+} ring_t ODP_ALIGNED_CACHE;
+
+/* Initialize ring */
+static inline void ring_init(ring_t *ring)
+{
+       odp_atomic_init_u32(&ring->w_head, 0);
+       odp_atomic_init_u32(&ring->w_tail, 0);
+       odp_atomic_init_u32(&ring->r_head, 0);
+       odp_atomic_init_u32(&ring->r_tail, 0);
+}
+
+/* Dequeue data from the ring head */
+static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
+{
+       uint32_t head, tail, new_head;
+       uint32_t data;
+
+       head = odp_atomic_load_u32(&ring->r_head);
+
+       /* Move reader head. This thread owns data at the new head. */
+       do {
+               tail = odp_atomic_load_u32(&ring->w_tail);
+
+               if (head == tail)
+                       return RING_EMPTY;
+
+               new_head = head + 1;
+
+       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+                             new_head) == 0));
+
+       /* Read queue index */
+       data = ring->data[new_head & mask];
+
+       /* Wait until other readers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+               odp_cpu_pause();
+
+       /* Now update the reader tail */
+       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+       return data;
+}
+
+/* Enqueue data into the ring tail */
+static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
+{
+       uint32_t old_head, new_head;
+
+       /* Reserve a slot in the ring for writing */
+       old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
+       new_head = old_head + 1;
+
+       /* Ring is full. Wait for the last reader to finish. */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+               odp_cpu_pause();

Should here be some logical timeout? For example in odp process mode if other process will die and not release atomic counter you will hang here forever. The same thing can be with abnormal thread termination.

Maxim.

+
+       /* Write data */
+       ring->data[new_head & mask] = data;
+
+       /* Wait until other writers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+               odp_cpu_pause();
+
+       /* Now update the writer tail */
+       odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 81e79c9..86c98fe 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -17,12 +17,12 @@
  #include <odp/api/hints.h>
  #include <odp/api/cpu.h>
  #include <odp/api/thrmask.h>
-#include <odp/api/atomic.h>
  #include <odp_config_internal.h>
  #include <odp_align_internal.h>
  #include <odp_schedule_internal.h>
  #include <odp_schedule_ordered_internal.h>
  #include <odp/api/sync.h>
+#include <odp_ring_internal.h>
/* Number of priority levels */
  #define NUM_PRIO 8
@@ -82,9 +82,6 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
  /* Priority queue empty, not a valid queue index. */
  #define PRIO_QUEUE_EMPTY ((uint32_t)-1)
-/* Ring empty, not a valid index. */
-#define RING_EMPTY ((uint32_t)-1)
-
  /* For best performance, the number of queues should be a power of two. */
  ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES),
                  "Number_of_queues_is_not_power_of_two");
@@ -111,28 +108,10 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= 
QUEUES_PER_PRIO,
  /* Start of named groups in group mask arrays */
  #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
-/* Scheduler ring
- *
- * Ring stores head and tail counters. Ring indexes are formed from these
- * counters with a mask (mask = ring_size - 1), which requires that ring size
- * must be a power of two. */
-typedef struct {
-       /* Writer head and tail */
-       odp_atomic_u32_t w_head;
-       odp_atomic_u32_t w_tail;
-       uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
-
-       /* Reader head and tail */
-       odp_atomic_u32_t r_head;
-       odp_atomic_u32_t r_tail;
-
-       uint32_t data[0];
-} sched_ring_t ODP_ALIGNED_CACHE;
-
  /* Priority queue */
  typedef struct {
        /* Ring header */
-       sched_ring_t ring;
+       ring_t ring;
/* Ring data: queue indexes */
        uint32_t queue_index[PRIO_QUEUE_RING_SIZE];
@@ -142,7 +121,7 @@ typedef struct {
  /* Packet IO queue */
  typedef struct {
        /* Ring header */
-       sched_ring_t ring;
+       ring_t ring;
/* Ring data: pktio poll command indexes */
        uint32_t cmd_index[PKTIO_RING_SIZE];
@@ -204,71 +183,6 @@ __thread sched_local_t sched_local;
  /* Function prototypes */
  static inline void schedule_release_context(void);
-static void ring_init(sched_ring_t *ring)
-{
-       odp_atomic_init_u32(&ring->w_head, 0);
-       odp_atomic_init_u32(&ring->w_tail, 0);
-       odp_atomic_init_u32(&ring->r_head, 0);
-       odp_atomic_init_u32(&ring->r_tail, 0);
-}
-
-/* Dequeue data from the ring head */
-static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask)
-{
-       uint32_t head, tail, new_head;
-       uint32_t data;
-
-       head = odp_atomic_load_u32(&ring->r_head);
-
-       /* Move reader head. This thread owns data at the new head. */
-       do {
-               tail = odp_atomic_load_u32(&ring->w_tail);
-
-               if (head == tail)
-                       return RING_EMPTY;
-
-               new_head = head + 1;
-
-       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
-                             new_head) == 0));
-
-       /* Read queue index */
-       data = ring->data[new_head & mask];
-
-       /* Wait until other readers have updated the tail */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
-               odp_cpu_pause();
-
-       /* Now update the reader tail */
-       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
-
-       return data;
-}
-
-/* Enqueue data into the ring tail */
-static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data)
-{
-       uint32_t old_head, new_head;
-
-       /* Reserve a slot in the ring for writing */
-       old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
-       new_head = old_head + 1;
-
-       /* Ring is full. Wait for the last reader to finish. */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
-               odp_cpu_pause();
-
-       /* Write data */
-       ring->data[new_head & mask] = data;
-
-       /* Wait until other writers have updated the tail */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
-               odp_cpu_pause();
-
-       /* Now update the writer tail */
-       odp_atomic_store_rel_u32(&ring->w_tail, new_head);
-}
-
  static void sched_local_init(void)
  {
        memset(&sched_local, 0, sizeof(sched_local_t));
@@ -346,7 +260,7 @@ static int schedule_term_global(void)
for (i = 0; i < NUM_PRIO; i++) {
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
-                       sched_ring_t *ring = &sched->prio_q[i][j].ring;
+                       ring_t *ring = &sched->prio_q[i][j].ring;
                        uint32_t qi;
while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
@@ -540,7 +454,7 @@ static void schedule_release_atomic(void)
        if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {
                int prio           = sched->queue[qi].prio;
                int queue_per_prio = sched->queue[qi].queue_per_prio;
-               sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+               ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
/* Release current atomic queue */
                ring_enq(ring, PRIO_QUEUE_MASK, qi);
@@ -635,7 +549,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        int grp;
                        int ordered;
                        odp_queue_t handle;
-                       sched_ring_t *ring;
+                       ring_t *ring;
if (id >= QUEUES_PER_PRIO)
                                id = 0;
@@ -746,7 +660,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) &
             PKTIO_CMD_QUEUE_MASK)) {
-               sched_ring_t *ring;
+               ring_t *ring;
                uint32_t cmd_index;
                pktio_cmd_t *cmd;
@@ -1041,7 +955,7 @@ static int schedule_sched_queue(uint32_t queue_index)
  {
        int prio           = sched->queue[queue_index].prio;
        int queue_per_prio = sched->queue[queue_index].queue_per_prio;
-       sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+       ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
sched_local.ignore_ordered_context = 1;

Reply via email to