This allows better readability, and make supporting ordered queues
 easier for other platforms

Signed-off-by: Nicolas Morey-Chaisemartin <[email protected]>
---
 platform/linux-generic/odp_queue.c | 225 +++++++++++++++++++++----------------
 1 file changed, 127 insertions(+), 98 deletions(-)

diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index ac933da..cf8e3ef 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -341,142 +341,171 @@ odp_queue_t odp_queue_lookup(const char *name)
        return ODP_QUEUE_INVALID;
 }
 
+/* Update queue head and/or tail and schedule status
+ * Return if the queue needs to be reschedule.
+ * Queue must be locked before calling this function
+ */
+static int _queue_enq_update(queue_entry_t *queue, odp_buffer_hdr_t *head,
+                            odp_buffer_hdr_t *tail){
+       if (!queue->s.head) {
+               /* Empty queue */
+               queue->s.head = head;
+               queue->s.tail = tail;
+       } else {
+               queue->s.tail->next = head;
+               queue->s.tail = tail;
+       }
+       tail->next = NULL;
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
+       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+               queue->s.status = QUEUE_STATUS_SCHED;
+               return  1; /* retval: schedule queue */
+       }
+       return 0;
+}
+
+static int _queue_enq_ordered(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+                             int sustain, uint64_t order,
+                             queue_entry_t *origin_qe)
 {
        int sched = 0;
-       queue_entry_t *origin_qe;
-       uint64_t order;
        odp_buffer_hdr_t *buf_tail;
 
-       get_queue_order(&origin_qe, &order, buf_hdr);
+       LOCK(&origin_qe->s.lock);
 
        /* Need two locks for enq operations from ordered queues */
-       if (origin_qe) {
+       while (!LOCK_TRY(&queue->s.lock)) {
+               UNLOCK(&origin_qe->s.lock);
                LOCK(&origin_qe->s.lock);
-               while (!LOCK_TRY(&queue->s.lock)) {
-                       UNLOCK(&origin_qe->s.lock);
-                       LOCK(&origin_qe->s.lock);
-               }
-               if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
-                       UNLOCK(&queue->s.lock);
-                       UNLOCK(&origin_qe->s.lock);
-                       ODP_ERR("Bad origin queue status\n");
-                       ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
-                               queue->s.name, origin_qe->s.name, buf_hdr);
-                       return -1;
-               }
-       } else {
-               LOCK(&queue->s.lock);
+       }
+
+       if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+               UNLOCK(&queue->s.lock);
+               UNLOCK(&origin_qe->s.lock);
+               ODP_ERR("Bad origin queue status\n");
+               ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+                       queue->s.name, origin_qe->s.name, buf_hdr);
+               return -1;
        }
 
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
-               if (origin_qe)
-                       UNLOCK(&origin_qe->s.lock);
+               UNLOCK(&origin_qe->s.lock);
                ODP_ERR("Bad queue status\n");
                return -1;
        }
 
        /* We can only complete the enq if we're in order */
-       if (origin_qe) {
-               sched_enq_called();
-               if (order > origin_qe->s.order_out) {
-                       reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+       sched_enq_called();
+       if (order > origin_qe->s.order_out) {
+               reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
 
-                       /* This enq can't complete until order is restored, so
-                        * we're done here.
-                        */
-                       UNLOCK(&queue->s.lock);
-                       UNLOCK(&origin_qe->s.lock);
-                       return 0;
-               }
+               /* This enq can't complete until order is restored, so
+                * we're done here.
+                */
+               UNLOCK(&queue->s.lock);
+               UNLOCK(&origin_qe->s.lock);
+               return 0;
+       }
 
-               /* We're in order, so account for this and proceed with enq */
-               if (!sustain) {
-                       order_release(origin_qe, 1);
-                       sched_order_resolved(buf_hdr);
-               }
+       /* We're in order, so account for this and proceed with enq */
+       if (!sustain) {
+               order_release(origin_qe, 1);
+               sched_order_resolved(buf_hdr);
+       }
 
-               /* if this element is linked, restore the linked chain */
-               buf_tail = buf_hdr->link;
+       /* if this element is linked, restore the linked chain */
+       buf_tail = buf_hdr->link;
 
-               if (buf_tail) {
-                       buf_hdr->next = buf_tail;
-                       buf_hdr->link = NULL;
+       if (buf_tail) {
+               buf_hdr->next = buf_tail;
+               buf_hdr->link = NULL;
 
-                       /* find end of the chain */
-                       while (buf_tail->next)
-                               buf_tail = buf_tail->next;
-               } else {
-                       buf_tail = buf_hdr;
-               }
+               /* find end of the chain */
+               while (buf_tail->next)
+                       buf_tail = buf_tail->next;
        } else {
                buf_tail = buf_hdr;
        }
 
-       if (!queue->s.head) {
-               /* Empty queue */
-               queue->s.head = buf_hdr;
-               queue->s.tail = buf_tail;
-               buf_tail->next = NULL;
-       } else {
-               queue->s.tail->next = buf_hdr;
-               queue->s.tail = buf_tail;
-               buf_tail->next = NULL;
-       }
-
-       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-               queue->s.status = QUEUE_STATUS_SCHED;
-               sched = 1; /* retval: schedule queue */
-       }
+       sched = _queue_enq_update(queue, buf_hdr, buf_tail);
 
        /*
         * If we came from an ordered queue, check to see if our successful
         * enq has unblocked other buffers in the origin's reorder queue.
         */
-       if (origin_qe) {
-               odp_buffer_hdr_t *reorder_buf;
-               odp_buffer_hdr_t *next_buf;
-               odp_buffer_hdr_t *reorder_prev;
-               odp_buffer_hdr_t *placeholder_buf;
-               int               deq_count, release_count, placeholder_count;
-
-               deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
-                                       &reorder_prev, &placeholder_buf,
-                                       &release_count, &placeholder_count);
-
-               /* Add released buffers to the queue as well */
-               if (deq_count > 0) {
-                       queue->s.tail->next       = origin_qe->s.reorder_head;
-                       queue->s.tail             = reorder_prev;
-                       origin_qe->s.reorder_head = reorder_prev->next;
-                       reorder_prev->next        = NULL;
-               }
+       odp_buffer_hdr_t *reorder_buf;
+       odp_buffer_hdr_t *next_buf;
+       odp_buffer_hdr_t *reorder_prev;
+       odp_buffer_hdr_t *placeholder_buf;
+       int               deq_count, release_count, placeholder_count;
 
-               /* Reflect resolved orders in the output sequence */
-               order_release(origin_qe, release_count + placeholder_count);
+       deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
+                               &reorder_prev, &placeholder_buf,
+                               &release_count, &placeholder_count);
 
-               /* Now handle any unblocked complete buffers destined for
-                * other queues, appending placeholder bufs as needed.
-                */
-               UNLOCK(&queue->s.lock);
-               reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
-               UNLOCK(&origin_qe->s.lock);
+       /* Add released buffers to the queue as well */
+       if (deq_count > 0) {
+               queue->s.tail->next       = origin_qe->s.reorder_head;
+               queue->s.tail             = reorder_prev;
+               origin_qe->s.reorder_head = reorder_prev->next;
+               reorder_prev->next        = NULL;
+       }
 
-               if (reorder_buf)
-                       queue_enq_internal(reorder_buf);
+       /* Reflect resolved orders in the output sequence */
+       order_release(origin_qe, release_count + placeholder_count);
 
-               /* Free all placeholder bufs that are now released */
-               while (placeholder_buf) {
-                       next_buf = placeholder_buf->next;
-                       odp_buffer_free(placeholder_buf->handle.handle);
-                       placeholder_buf = next_buf;
-               }
-       } else {
+       /* Now handle any unblocked complete buffers destined for
+        * other queues, appending placeholder bufs as needed.
+        */
+       UNLOCK(&queue->s.lock);
+       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
+       UNLOCK(&origin_qe->s.lock);
+
+       if (reorder_buf)
+               queue_enq_internal(reorder_buf);
+
+       /* Free all placeholder bufs that are now released */
+       while (placeholder_buf) {
+               next_buf = placeholder_buf->next;
+               odp_buffer_free(placeholder_buf->handle.handle);
+               placeholder_buf = next_buf;
+       }
+
+       /* Add queue to scheduling */
+       if (sched && schedule_queue(queue))
+               ODP_ABORT("schedule_queue failed\n");
+
+       return 0;
+}
+
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
+{
+       int sched = 0;
+       queue_entry_t *origin_qe;
+       uint64_t order;
+       odp_buffer_hdr_t *buf_tail;
+
+       get_queue_order(&origin_qe, &order, buf_hdr);
+
+       if (origin_qe)
+               return _queue_enq_ordered(queue, buf_hdr, sustain,
+                                         order, origin_qe);
+
+       LOCK(&queue->s.lock);
+
+       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
+               ODP_ERR("Bad queue status\n");
+               return -1;
        }
 
+       buf_tail = buf_hdr;
+
+       sched = _queue_enq_update(queue, buf_hdr, buf_tail);
+
+       UNLOCK(&queue->s.lock);
+
        /* Add queue to scheduling */
        if (sched && schedule_queue(queue))
                ODP_ABORT("schedule_queue failed\n");
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to