This allows better readability, and make supporting ordered queues easier for other platforms
Signed-off-by: Nicolas Morey-Chaisemartin <[email protected]> --- platform/linux-generic/odp_queue.c | 225 +++++++++++++++++++++---------------- 1 file changed, 127 insertions(+), 98 deletions(-) diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index ac933da..cf8e3ef 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -341,142 +341,171 @@ odp_queue_t odp_queue_lookup(const char *name) return ODP_QUEUE_INVALID; } +/* Update queue head and/or tail and schedule status + * Return if the queue needs to be reschedule. + * Queue must be locked before calling this function + */ +static int _queue_enq_update(queue_entry_t *queue, odp_buffer_hdr_t *head, + odp_buffer_hdr_t *tail){ + if (!queue->s.head) { + /* Empty queue */ + queue->s.head = head; + queue->s.tail = tail; + } else { + queue->s.tail->next = head; + queue->s.tail = tail; + } + tail->next = NULL; -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { + queue->s.status = QUEUE_STATUS_SCHED; + return 1; /* retval: schedule queue */ + } + return 0; +} + +static int _queue_enq_ordered(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, + int sustain, uint64_t order, + queue_entry_t *origin_qe) { int sched = 0; - queue_entry_t *origin_qe; - uint64_t order; odp_buffer_hdr_t *buf_tail; - get_queue_order(&origin_qe, &order, buf_hdr); + LOCK(&origin_qe->s.lock); /* Need two locks for enq operations from ordered queues */ - if (origin_qe) { + while (!LOCK_TRY(&queue->s.lock)) { + UNLOCK(&origin_qe->s.lock); LOCK(&origin_qe->s.lock); - while (!LOCK_TRY(&queue->s.lock)) { - UNLOCK(&origin_qe->s.lock); - LOCK(&origin_qe->s.lock); - } - if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { - UNLOCK(&queue->s.lock); - UNLOCK(&origin_qe->s.lock); - ODP_ERR("Bad origin queue status\n"); - ODP_ERR("queue = %s, origin q = %s, buf = %p\n", - queue->s.name, origin_qe->s.name, buf_hdr); - return -1; - } - } else { - LOCK(&queue->s.lock); + } + + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&queue->s.lock); + UNLOCK(&origin_qe->s.lock); + ODP_ERR("Bad origin queue status\n"); + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", + queue->s.name, origin_qe->s.name, buf_hdr); + return -1; } if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); - if (origin_qe) - UNLOCK(&origin_qe->s.lock); + UNLOCK(&origin_qe->s.lock); ODP_ERR("Bad queue status\n"); return -1; } /* We can only complete the enq if we're in order */ - if (origin_qe) { - sched_enq_called(); - if (order > origin_qe->s.order_out) { - reorder_enq(queue, order, origin_qe, buf_hdr, sustain); + sched_enq_called(); + if (order > origin_qe->s.order_out) { + reorder_enq(queue, order, origin_qe, buf_hdr, sustain); - /* This enq can't complete until order is restored, so - * we're done here. - */ - UNLOCK(&queue->s.lock); - UNLOCK(&origin_qe->s.lock); - return 0; - } + /* This enq can't complete until order is restored, so + * we're done here. + */ + UNLOCK(&queue->s.lock); + UNLOCK(&origin_qe->s.lock); + return 0; + } - /* We're in order, so account for this and proceed with enq */ - if (!sustain) { - order_release(origin_qe, 1); - sched_order_resolved(buf_hdr); - } + /* We're in order, so account for this and proceed with enq */ + if (!sustain) { + order_release(origin_qe, 1); + sched_order_resolved(buf_hdr); + } - /* if this element is linked, restore the linked chain */ - buf_tail = buf_hdr->link; + /* if this element is linked, restore the linked chain */ + buf_tail = buf_hdr->link; - if (buf_tail) { - buf_hdr->next = buf_tail; - buf_hdr->link = NULL; + if (buf_tail) { + buf_hdr->next = buf_tail; + buf_hdr->link = NULL; - /* find end of the chain */ - while (buf_tail->next) - buf_tail = buf_tail->next; - } else { - buf_tail = buf_hdr; - } + /* find end of the chain */ + while (buf_tail->next) + buf_tail = buf_tail->next; } else { buf_tail = buf_hdr; } - if (!queue->s.head) { - /* Empty queue */ - queue->s.head = buf_hdr; - queue->s.tail = buf_tail; - buf_tail->next = NULL; - } else { - queue->s.tail->next = buf_hdr; - queue->s.tail = buf_tail; - buf_tail->next = NULL; - } - - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { - queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; /* retval: schedule queue */ - } + sched = _queue_enq_update(queue, buf_hdr, buf_tail); /* * If we came from an ordered queue, check to see if our successful * enq has unblocked other buffers in the origin's reorder queue. */ - if (origin_qe) { - odp_buffer_hdr_t *reorder_buf; - odp_buffer_hdr_t *next_buf; - odp_buffer_hdr_t *reorder_prev; - odp_buffer_hdr_t *placeholder_buf; - int deq_count, release_count, placeholder_count; - - deq_count = reorder_deq(queue, origin_qe, &reorder_buf, - &reorder_prev, &placeholder_buf, - &release_count, &placeholder_count); - - /* Add released buffers to the queue as well */ - if (deq_count > 0) { - queue->s.tail->next = origin_qe->s.reorder_head; - queue->s.tail = reorder_prev; - origin_qe->s.reorder_head = reorder_prev->next; - reorder_prev->next = NULL; - } + odp_buffer_hdr_t *reorder_buf; + odp_buffer_hdr_t *next_buf; + odp_buffer_hdr_t *reorder_prev; + odp_buffer_hdr_t *placeholder_buf; + int deq_count, release_count, placeholder_count; - /* Reflect resolved orders in the output sequence */ - order_release(origin_qe, release_count + placeholder_count); + deq_count = reorder_deq(queue, origin_qe, &reorder_buf, + &reorder_prev, &placeholder_buf, + &release_count, &placeholder_count); - /* Now handle any unblocked complete buffers destined for - * other queues, appending placeholder bufs as needed. - */ - UNLOCK(&queue->s.lock); - reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1); - UNLOCK(&origin_qe->s.lock); + /* Add released buffers to the queue as well */ + if (deq_count > 0) { + queue->s.tail->next = origin_qe->s.reorder_head; + queue->s.tail = reorder_prev; + origin_qe->s.reorder_head = reorder_prev->next; + reorder_prev->next = NULL; + } - if (reorder_buf) - queue_enq_internal(reorder_buf); + /* Reflect resolved orders in the output sequence */ + order_release(origin_qe, release_count + placeholder_count); - /* Free all placeholder bufs that are now released */ - while (placeholder_buf) { - next_buf = placeholder_buf->next; - odp_buffer_free(placeholder_buf->handle.handle); - placeholder_buf = next_buf; - } - } else { + /* Now handle any unblocked complete buffers destined for + * other queues, appending placeholder bufs as needed. + */ + UNLOCK(&queue->s.lock); + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1); + UNLOCK(&origin_qe->s.lock); + + if (reorder_buf) + queue_enq_internal(reorder_buf); + + /* Free all placeholder bufs that are now released */ + while (placeholder_buf) { + next_buf = placeholder_buf->next; + odp_buffer_free(placeholder_buf->handle.handle); + placeholder_buf = next_buf; + } + + /* Add queue to scheduling */ + if (sched && schedule_queue(queue)) + ODP_ABORT("schedule_queue failed\n"); + + return 0; +} + +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) +{ + int sched = 0; + queue_entry_t *origin_qe; + uint64_t order; + odp_buffer_hdr_t *buf_tail; + + get_queue_order(&origin_qe, &order, buf_hdr); + + if (origin_qe) + return _queue_enq_ordered(queue, buf_hdr, sustain, + order, origin_qe); + + LOCK(&queue->s.lock); + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); + ODP_ERR("Bad queue status\n"); + return -1; } + buf_tail = buf_hdr; + + sched = _queue_enq_update(queue, buf_hdr, buf_tail); + + UNLOCK(&queue->s.lock); + /* Add queue to scheduling */ if (sched && schedule_queue(queue)) ODP_ABORT("schedule_queue failed\n"); _______________________________________________ lng-odp mailing list [email protected] https://lists.linaro.org/mailman/listinfo/lng-odp
