Remove old ordered queue code. Replaced temporarily by atomic handling. Signed-off-by: Matias Elo <matias....@nokia.com> --- platform/linux-generic/Makefile.am | 3 - .../linux-generic/include/odp_buffer_internal.h | 7 - .../linux-generic/include/odp_packet_io_queue.h | 5 +- .../linux-generic/include/odp_queue_internal.h | 26 +- platform/linux-generic/include/odp_schedule_if.h | 3 +- .../linux-generic/include/odp_schedule_internal.h | 50 -- .../include/odp_schedule_ordered_internal.h | 25 - platform/linux-generic/odp_packet_io.c | 17 +- platform/linux-generic/odp_queue.c | 57 +- platform/linux-generic/odp_schedule.c | 83 ++- platform/linux-generic/odp_schedule_ordered.c | 818 --------------------- platform/linux-generic/odp_schedule_sp.c | 3 +- platform/linux-generic/odp_traffic_mngr.c | 28 +- platform/linux-generic/pktio/loop.c | 2 +- 14 files changed, 103 insertions(+), 1024 deletions(-) delete mode 100644 platform/linux-generic/include/odp_schedule_internal.h delete mode 100644 platform/linux-generic/include/odp_schedule_ordered_internal.h delete mode 100644 platform/linux-generic/odp_schedule_ordered.c
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index b60eacb..adbe24d 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -153,8 +153,6 @@ noinst_HEADERS = \ ${srcdir}/include/odp_queue_internal.h \ ${srcdir}/include/odp_ring_internal.h \ ${srcdir}/include/odp_schedule_if.h \ - ${srcdir}/include/odp_schedule_internal.h \ - ${srcdir}/include/odp_schedule_ordered_internal.h \ ${srcdir}/include/odp_sorted_list_internal.h \ ${srcdir}/include/odp_shm_internal.h \ ${srcdir}/include/odp_timer_internal.h \ @@ -208,7 +206,6 @@ __LIB__libodp_linux_la_SOURCES = \ odp_rwlock_recursive.c \ odp_schedule.c \ odp_schedule_if.c \ - odp_schedule_ordered.c \ odp_schedule_sp.c \ odp_shared_memory.c \ odp_sorted_list.c \ diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 4e75908..2064f7c 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -79,7 +79,6 @@ struct odp_buffer_hdr_t { uint32_t all; struct { uint32_t hdrdata:1; /* Data is in buffer hdr */ - uint32_t sustain:1; /* Sustain order */ }; } flags; @@ -95,12 +94,6 @@ struct odp_buffer_hdr_t { uint32_t uarea_size; /* size of user area */ uint32_t segcount; /* segment count */ uint32_t segsize; /* segment size */ - uint64_t order; /* sequence for ordered queues */ - queue_entry_t *origin_qe; /* ordered queue origin */ - union { - queue_entry_t *target_qe; /* ordered queue target */ - uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - }; #ifdef _ODP_PKTIO_IPC /* ipc mapped process can not walk over pointers, * offset has to be used */ diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h index 13b79f3..d1d4b22 100644 --- a/platform/linux-generic/include/odp_packet_io_queue.h +++ b/platform/linux-generic/include/odp_packet_io_queue.h @@ -28,11 +28,10 @@ extern "C" { ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX, "ODP_PKTIN_DEQ_MULTI_MAX_ERROR"); -int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain); +int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue); -int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain); +int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index e223d9f..df36b76 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -41,11 +41,11 @@ extern "C" { /* forward declaration */ union queue_entry_u; -typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *, int); +typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *); typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *); typedef int (*enq_multi_func_t)(union queue_entry_u *, - odp_buffer_hdr_t **, int, int); + odp_buffer_hdr_t **, int); typedef int (*deq_multi_func_t)(union queue_entry_u *, odp_buffer_hdr_t **, int); @@ -68,12 +68,6 @@ struct queue_entry_s { odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; char name[ODP_QUEUE_NAME_LEN]; - uint64_t order_in; - uint64_t order_out; - odp_buffer_hdr_t *reorder_head; - odp_buffer_hdr_t *reorder_tail; - odp_atomic_u64_t sync_in[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - odp_atomic_u64_t sync_out[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; }; union queue_entry_u { @@ -84,24 +78,12 @@ union queue_entry_u { queue_entry_t *get_qentry(uint32_t queue_id); -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain); +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); -int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain); +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); -int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain); -int queue_pktout_enq_multi(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr[], int num, int sustain); - -int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain); -int queue_tm_reenq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain); -int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); - void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue); diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 37f88a4..72af01e 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -31,8 +31,7 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index, typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index); typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index, - void *buf_hdr[], int num, - int sustain, int *ret); + void *buf_hdr[], int num, int *ret); typedef int (*schedule_init_global_fn_t)(void); typedef int (*schedule_term_global_fn_t)(void); typedef int (*schedule_init_local_fn_t)(void); diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h deleted file mode 100644 index 02637c2..0000000 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ /dev/null @@ -1,50 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef ODP_SCHEDULE_INTERNAL_H_ -#define ODP_SCHEDULE_INTERNAL_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -/* Maximum number of dequeues */ -#define MAX_DEQ CONFIG_BURST_SIZE - -typedef struct { - int thr; - int num; - int index; - int pause; - uint16_t round; - uint16_t prefer_offset; - uint16_t pktin_polls; - uint32_t queue_index; - odp_queue_t queue; - odp_event_t ev_stash[MAX_DEQ]; - void *origin_qe; - uint64_t order; - uint64_t sync[SCHEDULE_ORDERED_LOCKS_PER_QUEUE]; - odp_pool_t pool; - int enq_called; - int ignore_ordered_context; -} sched_local_t; - -extern __thread sched_local_t sched_local; - -void cache_order_info(uint32_t queue_index); -int release_order(void *origin_qe, uint64_t order, - odp_pool_t pool, int enq_called); - -/* API functions implemented in odp_schedule_ordered.c */ -void schedule_order_lock(unsigned lock_index); -void schedule_order_unlock(unsigned lock_index); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/platform/linux-generic/include/odp_schedule_ordered_internal.h b/platform/linux-generic/include/odp_schedule_ordered_internal.h deleted file mode 100644 index 0ffbe3a..0000000 --- a/platform/linux-generic/include/odp_schedule_ordered_internal.h +++ /dev/null @@ -1,25 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef ODP_SCHEDULE_ORDERED_INTERNAL_H_ -#define ODP_SCHEDULE_ORDERED_INTERNAL_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#define SUSTAIN_ORDER 1 - -int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr, - int sustain, int *ret); -int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], - int num, int sustain, int *ret); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 7566789..98460a5 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -570,7 +570,7 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue, int ret; dst_queue = queue_to_qentry(pkt_hdr->dst_queue); - ret = queue_enq(dst_queue, buf_hdr, 0); + ret = queue_enq(dst_queue, buf_hdr); if (ret < 0) odp_packet_free(pkt); continue; @@ -619,7 +619,7 @@ int pktout_deq_multi(queue_entry_t *qentry ODP_UNUSED, } int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr ODP_UNUSED, int sustain ODP_UNUSED) + odp_buffer_hdr_t *buf_hdr ODP_UNUSED) { ODP_ABORT("attempted enqueue to a pktin queue"); return -1; @@ -641,14 +641,13 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry) return NULL; if (pkts > 1) - queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1, 0); + queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1); buf_hdr = hdr_tbl[0]; return buf_hdr; } int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, - int num ODP_UNUSED, int sustain ODP_UNUSED) + odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, int num ODP_UNUSED) { ODP_ABORT("attempted enqueue to a pktin queue"); return 0; @@ -682,7 +681,7 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num) hdr_tbl[j] = hdr_tbl[i]; if (j) - queue_enq_multi(qentry, hdr_tbl, j, 0); + queue_enq_multi(qentry, hdr_tbl, j); return nbr; } @@ -720,7 +719,7 @@ int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]) queue = entry->s.in_queue[index[idx]].queue; qentry = queue_to_qentry(queue); - queue_enq_multi(qentry, hdr_tbl, num, 0); + queue_enq_multi(qentry, hdr_tbl, num); } return 0; @@ -1386,9 +1385,9 @@ int odp_pktout_queue_config(odp_pktio_t pktio, qentry->s.pktout.pktio = pktio; /* Override default enqueue / dequeue functions */ - qentry->s.enqueue = queue_pktout_enq; + qentry->s.enqueue = pktout_enqueue; qentry->s.dequeue = pktout_dequeue; - qentry->s.enqueue_multi = queue_pktout_enq_multi; + qentry->s.enqueue_multi = pktout_enq_multi; qentry->s.dequeue_multi = pktout_deq_multi; entry->s.out_queue[i].queue = queue; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 43e212a..74f384d 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -23,7 +23,6 @@ #include <odp/api/hints.h> #include <odp/api/sync.h> #include <odp/api/traffic_mngr.h> -#include <odp_schedule_ordered_internal.h> #define NUM_INTERNAL_QUEUES 64 @@ -90,16 +89,13 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.head = NULL; queue->s.tail = NULL; - queue->s.reorder_head = NULL; - queue->s.reorder_tail = NULL; - return 0; } int odp_queue_init_global(void) { - uint32_t i, j; + uint32_t i; odp_shm_t shm; ODP_DBG("Queue init ... "); @@ -119,10 +115,6 @@ int odp_queue_init_global(void) /* init locks */ queue_entry_t *queue = get_qentry(i); LOCK_INIT(&queue->s.lock); - for (j = 0; j < SCHEDULE_ORDERED_LOCKS_PER_QUEUE; j++) { - odp_atomic_init_u64(&queue->s.sync_in[j], 0); - odp_atomic_init_u64(&queue->s.sync_out[j], 0); - } queue->s.index = i; queue->s.handle = queue_from_id(i); } @@ -310,12 +302,6 @@ int odp_queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; } - if (queue_is_ordered(queue) && queue->s.reorder_head) { - UNLOCK(&queue->s.lock); - ODP_ERR("queue \"%s\" reorder queue not empty\n", - queue->s.name); - return -1; - } switch (queue->s.status) { case QUEUE_STATUS_READY: @@ -379,15 +365,14 @@ odp_queue_t odp_queue_lookup(const char *name) } static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain) + int num) { int sched = 0; int i, ret; odp_buffer_hdr_t *hdr, *tail, *next_hdr; - /* Ordered queues do not use bursts */ if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num, - sustain, &ret)) + &ret)) return ret; /* Optimize the common case of single enqueue */ @@ -395,12 +380,14 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], tail = buf_hdr[0]; hdr = tail; hdr->burst_num = 0; + hdr->next = NULL; } else { int next; /* Start from the last buffer header */ tail = buf_hdr[num - 1]; hdr = tail; + hdr->next = NULL; next = num - 2; while (1) { @@ -453,17 +440,16 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return num; /* All events enqueued */ } -int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, - int sustain) +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - return enq_multi(queue, buf_hdr, num, sustain); + return enq_multi(queue, buf_hdr, num); } -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) { int ret; - ret = enq_multi(queue, &buf_hdr, 1, sustain); + ret = enq_multi(queue, &buf_hdr, 1); if (ret == 1) return 0; @@ -486,7 +472,7 @@ int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) buf_hdr[i] = buf_hdl_to_hdr(odp_buffer_from_event(ev[i])); return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, - num, SUSTAIN_ORDER); + num); } int odp_queue_enq(odp_queue_t handle, odp_event_t ev) @@ -500,7 +486,7 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) /* No chains via this entry */ buf_hdr->link = NULL; - return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER); + return queue->s.enqueue(queue, buf_hdr); } static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], @@ -557,22 +543,6 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], i++; } - /* Ordered queue book keeping inside the lock */ - if (queue_is_ordered(queue)) { - for (j = 0; j < i; j++) { - uint32_t k; - - buf_hdr[j]->origin_qe = queue; - buf_hdr[j]->order = queue->s.order_in++; - for (k = 0; k < queue->s.param.sched.lock_count; k++) { - buf_hdr[j]->sync[k] = - odp_atomic_fetch_inc_u64 - (&queue->s.sync_in[k]); - } - buf_hdr[j]->flags.sustain = SUSTAIN_ORDER; - } - } - /* Write head only if updated */ if (updated) queue->s.head = hdr; @@ -583,11 +553,6 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], UNLOCK(&queue->s.lock); - /* Init origin_qe for non-ordered queues */ - if (!queue_is_ordered(queue)) - for (j = 0; j < i; j++) - buf_hdr[j]->origin_qe = NULL; - return i; } diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index cab68a3..50639ff 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -19,10 +19,9 @@ #include <odp/api/thrmask.h> #include <odp_config_internal.h> #include <odp_align_internal.h> -#include <odp_schedule_internal.h> -#include <odp_schedule_ordered_internal.h> #include <odp/api/sync.h> #include <odp_ring_internal.h> +#include <odp_queue_internal.h> /* Number of priority levels */ #define NUM_PRIO 8 @@ -108,6 +107,24 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Start of named groups in group mask arrays */ #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1) +/* Maximum number of dequeues */ +#define MAX_DEQ CONFIG_BURST_SIZE + +/* Scheduler local data */ +typedef struct { + int thr; + int num; + int index; + int pause; + uint16_t round; + uint16_t prefer_offset; + uint16_t pktin_polls; + uint32_t queue_index; + odp_queue_t queue; + odp_event_t ev_stash[MAX_DEQ]; + void *queue_entry; +} sched_local_t; + /* Priority queue */ typedef struct { /* Ring header */ @@ -465,23 +482,16 @@ static void schedule_release_atomic(void) static void schedule_release_ordered(void) { - if (sched_local.origin_qe) { - int rc = release_order(sched_local.origin_qe, - sched_local.order, - sched_local.pool, - sched_local.enq_called); - if (rc == 0) - sched_local.origin_qe = NULL; - } + /* Process ordered queue as atomic */ + schedule_release_atomic(); + sched_local.queue_entry = NULL; } static inline void schedule_release_context(void) { - if (sched_local.origin_qe != NULL) { - release_order(sched_local.origin_qe, sched_local.order, - sched_local.pool, sched_local.enq_called); - sched_local.origin_qe = NULL; - } else + if (sched_local.queue_entry != NULL) + schedule_release_ordered(); + else schedule_release_atomic(); } @@ -500,6 +510,18 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) return i; } +static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], + int num, int *ret) +{ + (void)queue_index; + (void)buf_hdr; + (void)num; + (void)ret; + + /* didn't consume the events */ + return 0; +} + /* * Schedule queues */ @@ -596,12 +618,11 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ordered = sched_cb_queue_is_ordered(qi); - /* For ordered queues we want consecutive events to - * be dispatched to separate threads, so do not cache - * them locally. - */ - if (ordered) - max_deq = 1; + /* Do not cache ordered events locally to improve + * parallelism. Ordered context can only be released + * when the local cache is empty. */ + if (ordered && max_num < MAX_DEQ) + max_deq = max_num; num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash, max_deq); @@ -626,11 +647,9 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ret = copy_events(out_ev, max_num); if (ordered) { - /* Continue scheduling ordered queues */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); - - /* Cache order info about this event */ - cache_order_info(qi); + /* Operate as atomic */ + sched_local.queue_index = qi; + sched_local.queue_entry = get_qentry(qi); } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.queue_index = qi; @@ -763,6 +782,14 @@ static void order_unlock(void) { } +static void schedule_order_lock(unsigned lock_index ODP_UNUSED) +{ +} + +static void schedule_order_unlock(unsigned lock_index ODP_UNUSED) +{ +} + static void schedule_pause(void) { sched_local.pause = 1; @@ -975,8 +1002,6 @@ static int schedule_sched_queue(uint32_t queue_index) int queue_per_prio = sched->queue[queue_index].queue_per_prio; ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; - sched_local.ignore_ordered_context = 1; - ring_enq(ring, PRIO_QUEUE_MASK, queue_index); return 0; } @@ -995,7 +1020,7 @@ const schedule_fn_t schedule_default_fn = { .init_queue = schedule_init_queue, .destroy_queue = schedule_destroy_queue, .sched_queue = schedule_sched_queue, - .ord_enq_multi = schedule_ordered_queue_enq_multi, + .ord_enq_multi = schedule_ord_enq_multi, .init_global = schedule_init_global, .term_global = schedule_term_global, .init_local = schedule_init_local, diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c deleted file mode 100644 index 5574faf..0000000 --- a/platform/linux-generic/odp_schedule_ordered.c +++ /dev/null @@ -1,818 +0,0 @@ -/* Copyright (c) 2016, Linaro Limited - * All rights reserved. - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include <odp_packet_io_queue.h> -#include <odp_queue_internal.h> -#include <odp_schedule_if.h> -#include <odp_schedule_ordered_internal.h> -#include <odp_traffic_mngr_internal.h> -#include <odp_schedule_internal.h> - -#define RESOLVE_ORDER 0 -#define NOAPPEND 0 -#define APPEND 1 - -static inline void sched_enq_called(void) -{ - sched_local.enq_called = 1; -} - -static inline void get_sched_order(queue_entry_t **origin_qe, uint64_t *order) -{ - if (sched_local.ignore_ordered_context) { - sched_local.ignore_ordered_context = 0; - *origin_qe = NULL; - } else { - *origin_qe = sched_local.origin_qe; - *order = sched_local.order; - } -} - -static inline void sched_order_resolved(odp_buffer_hdr_t *buf_hdr) -{ - if (buf_hdr) - buf_hdr->origin_qe = NULL; - sched_local.origin_qe = NULL; -} - -static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2) -{ - /* Special case: enq to self */ - if (qe1 == qe2) { - queue_lock(qe1); - return; - } - - /* Since any queue can be either a source or target, queues do not have - * a natural locking hierarchy. Create one by using the qentry address - * as the ordering mechanism. - */ - - if (qe1 < qe2) { - queue_lock(qe1); - queue_lock(qe2); - } else { - queue_lock(qe2); - queue_lock(qe1); - } -} - -static inline void free_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2) -{ - queue_unlock(qe1); - if (qe1 != qe2) - queue_unlock(qe2); -} - -static inline odp_buffer_hdr_t *get_buf_tail(odp_buffer_hdr_t *buf_hdr) -{ - odp_buffer_hdr_t *buf_tail = buf_hdr->link ? buf_hdr->link : buf_hdr; - - buf_hdr->next = buf_hdr->link; - buf_hdr->link = NULL; - - while (buf_tail->next) - buf_tail = buf_tail->next; - - return buf_tail; -} - -static inline void queue_add_list(queue_entry_t *queue, - odp_buffer_hdr_t *buf_head, - odp_buffer_hdr_t *buf_tail) -{ - if (queue->s.head) - queue->s.tail->next = buf_head; - else - queue->s.head = buf_head; - - queue->s.tail = buf_tail; -} - -static inline void queue_add_chain(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr) -{ - queue_add_list(queue, buf_hdr, get_buf_tail(buf_hdr)); -} - -static inline void reorder_enq(queue_entry_t *queue, - uint64_t order, - queue_entry_t *origin_qe, - odp_buffer_hdr_t *buf_hdr, - int sustain) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *reorder_prev = NULL; - - while (reorder_buf && order >= reorder_buf->order) { - reorder_prev = reorder_buf; - reorder_buf = reorder_buf->next; - } - - buf_hdr->next = reorder_buf; - - if (reorder_prev) - reorder_prev->next = buf_hdr; - else - origin_qe->s.reorder_head = buf_hdr; - - if (!reorder_buf) - origin_qe->s.reorder_tail = buf_hdr; - - buf_hdr->origin_qe = origin_qe; - buf_hdr->target_qe = queue; - buf_hdr->order = order; - buf_hdr->flags.sustain = sustain; -} - -static inline void order_release(queue_entry_t *origin_qe, int count) -{ - uint64_t sync; - uint32_t i; - - origin_qe->s.order_out += count; - - for (i = 0; i < origin_qe->s.param.sched.lock_count; i++) { - sync = odp_atomic_load_u64(&origin_qe->s.sync_out[i]); - if (sync < origin_qe->s.order_out) - odp_atomic_fetch_add_u64(&origin_qe->s.sync_out[i], - origin_qe->s.order_out - sync); - } -} - -static inline int reorder_deq(queue_entry_t *queue, - queue_entry_t *origin_qe, - odp_buffer_hdr_t **reorder_tail_return, - odp_buffer_hdr_t **placeholder_buf_return, - int *release_count_return, - int *placeholder_count_return) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *reorder_tail = NULL; - odp_buffer_hdr_t *placeholder_buf = NULL; - odp_buffer_hdr_t *next_buf; - int deq_count = 0; - int release_count = 0; - int placeholder_count = 0; - - while (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out + - release_count + placeholder_count) { - /* - * Elements on the reorder list fall into one of - * three categories: - * - * 1. Those destined for the same queue. These - * can be enq'd now if they were waiting to - * be unblocked by this enq. - * - * 2. Those representing placeholders for events - * whose ordering was released by a prior - * odp_schedule_release_ordered() call. These - * can now just be freed. - * - * 3. Those representing events destined for another - * queue. These cannot be consolidated with this - * enq since they have a different target. - * - * Detecting an element with an order sequence gap, an - * element in category 3, or running out of elements - * stops the scan. - */ - next_buf = reorder_buf->next; - - if (odp_likely(reorder_buf->target_qe == queue)) { - /* promote any chain */ - odp_buffer_hdr_t *reorder_link = - reorder_buf->link; - - if (reorder_link) { - reorder_buf->next = reorder_link; - reorder_buf->link = NULL; - while (reorder_link->next) - reorder_link = reorder_link->next; - reorder_link->next = next_buf; - reorder_tail = reorder_link; - } else { - reorder_tail = reorder_buf; - } - - deq_count++; - if (!reorder_buf->flags.sustain) - release_count++; - reorder_buf = next_buf; - } else if (!reorder_buf->target_qe) { - if (reorder_tail) - reorder_tail->next = next_buf; - else - origin_qe->s.reorder_head = next_buf; - - reorder_buf->next = placeholder_buf; - placeholder_buf = reorder_buf; - - reorder_buf = next_buf; - placeholder_count++; - } else { - break; - } - } - - *reorder_tail_return = reorder_tail; - *placeholder_buf_return = placeholder_buf; - *release_count_return = release_count; - *placeholder_count_return = placeholder_count; - - return deq_count; -} - -static inline void reorder_complete(queue_entry_t *origin_qe, - odp_buffer_hdr_t **reorder_buf_return, - odp_buffer_hdr_t **placeholder_buf, - int placeholder_append) -{ - odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; - odp_buffer_hdr_t *next_buf; - - *reorder_buf_return = NULL; - if (!placeholder_append) - *placeholder_buf = NULL; - - while (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out) { - next_buf = reorder_buf->next; - - if (!reorder_buf->target_qe) { - origin_qe->s.reorder_head = next_buf; - reorder_buf->next = *placeholder_buf; - *placeholder_buf = reorder_buf; - - reorder_buf = next_buf; - order_release(origin_qe, 1); - } else if (reorder_buf->flags.sustain) { - reorder_buf = next_buf; - } else { - *reorder_buf_return = origin_qe->s.reorder_head; - origin_qe->s.reorder_head = - origin_qe->s.reorder_head->next; - break; - } - } -} - -static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order, - odp_buffer_hdr_t *buf_hdr) -{ - if (buf_hdr && buf_hdr->origin_qe) { - *origin_qe = buf_hdr->origin_qe; - *order = buf_hdr->order; - } else { - get_sched_order(origin_qe, order); - } -} - -int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain ODP_UNUSED) -{ - odp_tm_queue_t tm_queue = MAKE_ODP_TM_QUEUE((uint8_t *)queue - - offsetof(tm_queue_obj_t, - tm_qentry)); - odp_packet_t pkt = (odp_packet_t)buf_hdr->handle.handle; - - return odp_tm_enq(tm_queue, pkt); -} - -int queue_tm_reenq_multi(queue_entry_t *queue ODP_UNUSED, - odp_buffer_hdr_t *buf[] ODP_UNUSED, - int num ODP_UNUSED, - int sustain ODP_UNUSED) -{ - ODP_ABORT("Invalid call to queue_tm_reenq_multi()\n"); - return 0; -} - -int queue_tm_reorder(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr) -{ - queue_entry_t *origin_qe; - uint64_t order; - - get_queue_order(&origin_qe, &order, buf_hdr); - - if (!origin_qe) - return 0; - - /* Check if we're in order */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return 0; - } - - sched_enq_called(); - - /* Wait if it's not our turn */ - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, SUSTAIN_ORDER); - queue_unlock(origin_qe); - return 1; - } - - /* Back to TM to handle enqueue - * - * Note: Order will be resolved by a subsequent call to - * odp_schedule_release_ordered() or odp_schedule() as odp_tm_enq() - * calls never resolve order by themselves. - */ - queue_unlock(origin_qe); - return 0; -} - -static int queue_enq_internal(odp_buffer_hdr_t *buf_hdr) -{ - return buf_hdr->target_qe->s.enqueue(buf_hdr->target_qe, buf_hdr, - buf_hdr->flags.sustain); -} - -static int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain, queue_entry_t *origin_qe, - uint64_t order) -{ - odp_buffer_hdr_t *reorder_buf; - odp_buffer_hdr_t *next_buf; - odp_buffer_hdr_t *reorder_tail; - odp_buffer_hdr_t *placeholder_buf = NULL; - int release_count, placeholder_count; - int sched = 0; - - /* Need two locks for enq operations from ordered queues */ - get_qe_locks(origin_qe, queue); - - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY || - queue->s.status < QUEUE_STATUS_READY)) { - free_qe_locks(queue, origin_qe); - ODP_ERR("Bad queue status\n"); - ODP_ERR("queue = %s, origin q = %s, buf = %p\n", - queue->s.name, origin_qe->s.name, buf_hdr); - return -1; - } - - /* Remember that enq was called for this order */ - sched_enq_called(); - - /* We can only complete this enq if we're in order */ - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, sustain); - - /* This enq can't complete until order is restored, so - * we're done here. - */ - free_qe_locks(queue, origin_qe); - return 0; - } - - /* Resolve order if requested */ - if (!sustain) { - order_release(origin_qe, 1); - sched_order_resolved(buf_hdr); - } - - /* Update queue status */ - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { - queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; - } - - /* We're in order, however the reorder queue may have other buffers - * sharing this order on it and this buffer must not be enqueued ahead - * of them. If the reorder queue is empty we can short-cut and - * simply add to the target queue directly. - */ - - if (!origin_qe->s.reorder_head) { - queue_add_chain(queue, buf_hdr); - free_qe_locks(queue, origin_qe); - - /* Add queue to scheduling */ - if (sched && sched_fn->sched_queue(queue->s.index)) - ODP_ABORT("schedule_queue failed\n"); - return 0; - } - - /* The reorder_queue is non-empty, so sort this buffer into it. Note - * that we force the sustain bit on here because we'll be removing - * this immediately and we already accounted for this order earlier. - */ - reorder_enq(queue, order, origin_qe, buf_hdr, 1); - - /* Pick up this element, and all others resolved by this enq, - * and add them to the target queue. - */ - reorder_deq(queue, origin_qe, &reorder_tail, &placeholder_buf, - &release_count, &placeholder_count); - - /* Move the list from the reorder queue to the target queue */ - if (queue->s.head) - queue->s.tail->next = origin_qe->s.reorder_head; - else - queue->s.head = origin_qe->s.reorder_head; - queue->s.tail = reorder_tail; - origin_qe->s.reorder_head = reorder_tail->next; - reorder_tail->next = NULL; - - /* Reflect resolved orders in the output sequence */ - order_release(origin_qe, release_count + placeholder_count); - - /* Now handle any resolved orders for events destined for other - * queues, appending placeholder bufs as needed. - */ - if (origin_qe != queue) - queue_unlock(queue); - - /* Add queue to scheduling */ - if (sched && sched_fn->sched_queue(queue->s.index)) - ODP_ABORT("schedule_queue failed\n"); - - reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND); - queue_unlock(origin_qe); - - if (reorder_buf) - queue_enq_internal(reorder_buf); - - /* Free all placeholder bufs that are now released */ - while (placeholder_buf) { - next_buf = placeholder_buf->next; - odp_buffer_free(placeholder_buf->handle.handle); - placeholder_buf = next_buf; - } - - return 0; -} - -int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], - int num, int sustain, int *ret) -{ - queue_entry_t *origin_qe; - uint64_t order; - int i, rc; - queue_entry_t *qe = get_qentry(queue_index); - odp_buffer_hdr_t *first_hdr = p_buf_hdr[0]; - odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr; - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) { - buf_hdr[i]->next = buf_hdr[i + 1]; - buf_hdr[i]->burst_num = 0; - } - - buf_hdr[num - 1]->next = NULL; - - /* Handle ordered enqueues commonly via links */ - get_queue_order(&origin_qe, &order, first_hdr); - if (origin_qe) { - first_hdr->link = first_hdr->next; - rc = ordered_queue_enq(qe, first_hdr, sustain, - origin_qe, order); - *ret = rc == 0 ? num : rc; - return 1; - } - - return 0; -} - -int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, - int sustain) -{ - queue_entry_t *origin_qe; - uint64_t order; - int rc; - - /* Special processing needed only if we came from an ordered queue */ - get_queue_order(&origin_qe, &order, buf_hdr); - if (!origin_qe) - return pktout_enqueue(queue, buf_hdr); - - /* Must lock origin_qe for ordered processing */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - - /* We can only complete the enq if we're in order */ - sched_enq_called(); - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, sustain); - - /* This enq can't complete until order is restored, so - * we're done here. - */ - queue_unlock(origin_qe); - return 0; - } - - /* Perform our enq since we're in order. - * Note: Don't hold the origin_qe lock across an I/O operation! - */ - queue_unlock(origin_qe); - - /* Handle any chained buffers (internal calls) */ - if (buf_hdr->link) { - odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX]; - odp_buffer_hdr_t *next_buf; - int num = 0; - - next_buf = buf_hdr->link; - buf_hdr->link = NULL; - - while (next_buf) { - buf_hdrs[num++] = next_buf; - next_buf = next_buf->next; - } - - rc = pktout_enq_multi(queue, buf_hdrs, num); - if (rc < num) - return -1; - } else { - rc = pktout_enqueue(queue, buf_hdr); - if (rc) - return rc; - } - - /* Reacquire the lock following the I/O send. Note that we're still - * guaranteed to be in order here since we haven't released - * order yet. - */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - - /* Account for this ordered enq */ - if (!sustain) { - order_release(origin_qe, 1); - sched_order_resolved(NULL); - } - - /* Now check to see if our successful enq has unblocked other buffers - * in the origin's reorder queue. - */ - odp_buffer_hdr_t *reorder_buf; - odp_buffer_hdr_t *next_buf; - odp_buffer_hdr_t *reorder_tail; - odp_buffer_hdr_t *xmit_buf; - odp_buffer_hdr_t *placeholder_buf; - int release_count, placeholder_count; - - /* Send released buffers as well */ - if (reorder_deq(queue, origin_qe, &reorder_tail, &placeholder_buf, - &release_count, &placeholder_count)) { - xmit_buf = origin_qe->s.reorder_head; - origin_qe->s.reorder_head = reorder_tail->next; - reorder_tail->next = NULL; - queue_unlock(origin_qe); - - do { - next_buf = xmit_buf->next; - pktout_enqueue(queue, xmit_buf); - xmit_buf = next_buf; - } while (xmit_buf); - - /* Reacquire the origin_qe lock to continue */ - queue_lock(origin_qe); - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - queue_unlock(origin_qe); - ODP_ERR("Bad origin queue status\n"); - return -1; - } - } - - /* Update the order sequence to reflect the deq'd elements */ - order_release(origin_qe, release_count + placeholder_count); - - /* Now handle sends to other queues that are ready to go */ - reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND); - - /* We're fully done with the origin_qe at last */ - queue_unlock(origin_qe); - - /* Now send the next buffer to its target queue */ - if (reorder_buf) - queue_enq_internal(reorder_buf); - - /* Free all placeholder bufs that are now released */ - while (placeholder_buf) { - next_buf = placeholder_buf->next; - odp_buffer_free(placeholder_buf->handle.handle); - placeholder_buf = next_buf; - } - - return 0; -} - -int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], - int num, int sustain) -{ - int i, rc; - queue_entry_t *origin_qe; - uint64_t order; - - /* If we're not ordered, handle directly */ - get_queue_order(&origin_qe, &order, buf_hdr[0]); - if (!origin_qe) - return pktout_enq_multi(queue, buf_hdr, num); - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) - buf_hdr[i]->next = buf_hdr[i + 1]; - - buf_hdr[num - 1]->next = NULL; - - /* Handle commonly via links */ - buf_hdr[0]->link = buf_hdr[0]->next; - rc = queue_pktout_enq(queue, buf_hdr[0], sustain); - return rc == 0 ? num : rc; -} - -/* These routines exists here rather than in odp_schedule - * because they operate on queue interenal structures - */ -int release_order(void *origin_qe_ptr, uint64_t order, - odp_pool_t pool, int enq_called) -{ - odp_buffer_t placeholder_buf; - odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf; - queue_entry_t *origin_qe = origin_qe_ptr; - - /* Must lock the origin queue to process the release */ - queue_lock(origin_qe); - - /* If we are in order we can release immediately since there can be no - * confusion about intermediate elements - */ - if (order <= origin_qe->s.order_out) { - reorder_buf = origin_qe->s.reorder_head; - - /* We're in order, however there may be one or more events on - * the reorder queue that are part of this order. If that is - * the case, remove them and let ordered_queue_enq() handle - * them and resolve the order for us. - */ - if (reorder_buf && reorder_buf->order == order) { - odp_buffer_hdr_t *reorder_head = reorder_buf; - - next_buf = reorder_buf->next; - - while (next_buf && next_buf->order == order) { - reorder_buf = next_buf; - next_buf = next_buf->next; - } - - origin_qe->s.reorder_head = reorder_buf->next; - reorder_buf->next = NULL; - - queue_unlock(origin_qe); - reorder_head->link = reorder_buf->next; - return ordered_queue_enq(reorder_head->target_qe, - reorder_head, RESOLVE_ORDER, - origin_qe, order); - } - - /* Reorder queue has no elements for this order, so it's safe - * to resolve order here - */ - order_release(origin_qe, 1); - - /* Check if this release allows us to unblock waiters. At the - * point of this call, the reorder list may contain zero or - * more placeholders that need to be freed, followed by zero - * or one complete reorder buffer chain. Note that since we - * are releasing order, we know no further enqs for this order - * can occur, so ignore the sustain bit to clear out our - * element(s) on the reorder queue - */ - reorder_complete(origin_qe, &reorder_buf, - &placeholder_buf_hdr, NOAPPEND); - - /* Now safe to unlock */ - queue_unlock(origin_qe); - - /* If reorder_buf has a target, do the enq now */ - if (reorder_buf) - queue_enq_internal(reorder_buf); - - while (placeholder_buf_hdr) { - odp_buffer_hdr_t *placeholder_next = - placeholder_buf_hdr->next; - - odp_buffer_free(placeholder_buf_hdr->handle.handle); - placeholder_buf_hdr = placeholder_next; - } - - return 0; - } - - /* If we are not in order we need a placeholder to represent our - * "place in line" unless we have issued enqs, in which case we - * already have a place in the reorder queue. If we need a - * placeholder, use an element from the same pool we were scheduled - * with is from, otherwise just ensure that the final element for our - * order is not marked sustain. - */ - if (enq_called) { - reorder_buf = NULL; - next_buf = origin_qe->s.reorder_head; - - while (next_buf && next_buf->order <= order) { - reorder_buf = next_buf; - next_buf = next_buf->next; - } - - if (reorder_buf && reorder_buf->order == order) { - reorder_buf->flags.sustain = 0; - queue_unlock(origin_qe); - return 0; - } - } - - placeholder_buf = odp_buffer_alloc(pool); - - /* Can't release if no placeholder is available */ - if (odp_unlikely(placeholder_buf == ODP_BUFFER_INVALID)) { - queue_unlock(origin_qe); - return -1; - } - - placeholder_buf_hdr = buf_hdl_to_hdr(placeholder_buf); - - /* Copy info to placeholder and add it to the reorder queue */ - placeholder_buf_hdr->origin_qe = origin_qe; - placeholder_buf_hdr->order = order; - placeholder_buf_hdr->flags.sustain = 0; - - reorder_enq(NULL, order, origin_qe, placeholder_buf_hdr, 0); - - queue_unlock(origin_qe); - return 0; -} - -void schedule_order_lock(unsigned lock_index) -{ - queue_entry_t *origin_qe; - uint64_t sync, sync_out; - - origin_qe = sched_local.origin_qe; - if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) - return; - - sync = sched_local.sync[lock_index]; - sync_out = odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]); - ODP_ASSERT(sync >= sync_out); - - /* Wait until we are in order. Note that sync_out will be incremented - * both by unlocks as well as order resolution, so we're OK if only - * some events in the ordered flow need to lock. - */ - while (sync != sync_out) { - odp_cpu_pause(); - sync_out = - odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]); - } -} - -void schedule_order_unlock(unsigned lock_index) -{ - queue_entry_t *origin_qe; - - origin_qe = sched_local.origin_qe; - if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count) - return; - ODP_ASSERT(sched_local.sync[lock_index] == - odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index])); - - /* Release the ordered lock */ - odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]); -} - -void cache_order_info(uint32_t queue_index) -{ - uint32_t i; - queue_entry_t *qe = get_qentry(queue_index); - odp_event_t ev = sched_local.ev_stash[0]; - odp_buffer_hdr_t *buf_hdr = buf_hdl_to_hdr(odp_buffer_from_event(ev)); - - sched_local.origin_qe = qe; - sched_local.order = buf_hdr->order; - sched_local.pool = buf_hdr->pool_hdl; - - for (i = 0; i < qe->s.param.sched.lock_count; i++) - sched_local.sync[i] = buf_hdr->sync[i]; - - sched_local.enq_called = 0; -} diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 5090a5c..069b8bf 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -299,12 +299,11 @@ static int sched_queue(uint32_t qi) } static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, - int sustain, int *ret) + int *ret) { (void)queue_index; (void)buf_hdr; (void)num; - (void)sustain; (void)ret; /* didn't consume the events */ diff --git a/platform/linux-generic/odp_traffic_mngr.c b/platform/linux-generic/odp_traffic_mngr.c index ffb149b..6a660c5 100644 --- a/platform/linux-generic/odp_traffic_mngr.c +++ b/platform/linux-generic/odp_traffic_mngr.c @@ -99,6 +99,24 @@ static odp_bool_t tm_demote_pkt_desc(tm_system_t *tm_system, tm_shaper_obj_t *timer_shaper, pkt_desc_t *demoted_pkt_desc); +static int queue_tm_reenq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +{ + odp_tm_queue_t tm_queue = MAKE_ODP_TM_QUEUE((uint8_t *)queue - + offsetof(tm_queue_obj_t, + tm_qentry)); + odp_packet_t pkt = (odp_packet_t)buf_hdr->handle.handle; + + return odp_tm_enq(tm_queue, pkt); +} + +static int queue_tm_reenq_multi(queue_entry_t *queue ODP_UNUSED, + odp_buffer_hdr_t *buf[] ODP_UNUSED, + int num ODP_UNUSED) +{ + ODP_ABORT("Invalid call to queue_tm_reenq_multi()\n"); + return 0; +} + static tm_queue_obj_t *get_tm_queue_obj(tm_system_t *tm_system, pkt_desc_t *pkt_desc) { @@ -1860,13 +1878,6 @@ static int tm_enqueue(tm_system_t *tm_system, odp_bool_t drop_eligible, drop; uint32_t frame_len, pkt_depth; int rc; - odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt); - - /* If we're from an ordered queue and not in order - * record the event and wait until order is resolved - */ - if (queue_tm_reorder(&tm_queue_obj->tm_qentry, &pkt_hdr->buf_hdr)) - return 0; if (tm_system->first_enq == 0) { odp_barrier_wait(&tm_system->tm_system_barrier); @@ -1886,7 +1897,10 @@ static int tm_enqueue(tm_system_t *tm_system, work_item.queue_num = tm_queue_obj->queue_num; work_item.pkt = pkt; + sched_fn->order_lock(); rc = input_work_queue_append(tm_system, &work_item); + sched_fn->order_unlock(); + if (rc < 0) { ODP_DBG("%s work queue full\n", __func__); return rc; diff --git a/platform/linux-generic/pktio/loop.c b/platform/linux-generic/pktio/loop.c index 28dd404..7096283 100644 --- a/platform/linux-generic/pktio/loop.c +++ b/platform/linux-generic/pktio/loop.c @@ -169,7 +169,7 @@ static int loopback_send(pktio_entry_t *pktio_entry, int index ODP_UNUSED, odp_ticketlock_lock(&pktio_entry->s.txl); qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq); - ret = queue_enq_multi(qentry, hdr_tbl, len, 0); + ret = queue_enq_multi(qentry, hdr_tbl, len); if (ret > 0) { pktio_entry->s.stats.out_ucast_pkts += ret; -- 2.7.4