Ping

On 09/10/2015 05:22 PM, Nicolas Morey-Chaisemartin wrote:
> This allows better readability, and make supporting ordered queues
>  easier for other platforms
>
> Signed-off-by: Nicolas Morey-Chaisemartin <[email protected]>
> ---
>  platform/linux-generic/odp_queue.c | 225 
> +++++++++++++++++++++----------------
>  1 file changed, 127 insertions(+), 98 deletions(-)
>
> diff --git a/platform/linux-generic/odp_queue.c 
> b/platform/linux-generic/odp_queue.c
> index ac933da..cf8e3ef 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -341,142 +341,171 @@ odp_queue_t odp_queue_lookup(const char *name)
>       return ODP_QUEUE_INVALID;
>  }
>  
> +/* Update queue head and/or tail and schedule status
> + * Return if the queue needs to be reschedule.
> + * Queue must be locked before calling this function
> + */
> +static int _queue_enq_update(queue_entry_t *queue, odp_buffer_hdr_t *head,
> +                          odp_buffer_hdr_t *tail){
> +     if (!queue->s.head) {
> +             /* Empty queue */
> +             queue->s.head = head;
> +             queue->s.tail = tail;
> +     } else {
> +             queue->s.tail->next = head;
> +             queue->s.tail = tail;
> +     }
> +     tail->next = NULL;
>  
> -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
> +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> +             queue->s.status = QUEUE_STATUS_SCHED;
> +             return  1; /* retval: schedule queue */
> +     }
> +     return 0;
> +}
> +
> +static int _queue_enq_ordered(queue_entry_t *queue, odp_buffer_hdr_t 
> *buf_hdr,
> +                           int sustain, uint64_t order,
> +                           queue_entry_t *origin_qe)
>  {
>       int sched = 0;
> -     queue_entry_t *origin_qe;
> -     uint64_t order;
>       odp_buffer_hdr_t *buf_tail;
>  
> -     get_queue_order(&origin_qe, &order, buf_hdr);
> +     LOCK(&origin_qe->s.lock);
>  
>       /* Need two locks for enq operations from ordered queues */
> -     if (origin_qe) {
> +     while (!LOCK_TRY(&queue->s.lock)) {
> +             UNLOCK(&origin_qe->s.lock);
>               LOCK(&origin_qe->s.lock);
> -             while (!LOCK_TRY(&queue->s.lock)) {
> -                     UNLOCK(&origin_qe->s.lock);
> -                     LOCK(&origin_qe->s.lock);
> -             }
> -             if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
> -                     UNLOCK(&queue->s.lock);
> -                     UNLOCK(&origin_qe->s.lock);
> -                     ODP_ERR("Bad origin queue status\n");
> -                     ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> -                             queue->s.name, origin_qe->s.name, buf_hdr);
> -                     return -1;
> -             }
> -     } else {
> -             LOCK(&queue->s.lock);
> +     }
> +
> +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
> +             UNLOCK(&queue->s.lock);
> +             UNLOCK(&origin_qe->s.lock);
> +             ODP_ERR("Bad origin queue status\n");
> +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> +                     queue->s.name, origin_qe->s.name, buf_hdr);
> +             return -1;
>       }
>  
>       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
>               UNLOCK(&queue->s.lock);
> -             if (origin_qe)
> -                     UNLOCK(&origin_qe->s.lock);
> +             UNLOCK(&origin_qe->s.lock);
>               ODP_ERR("Bad queue status\n");
>               return -1;
>       }
>  
>       /* We can only complete the enq if we're in order */
> -     if (origin_qe) {
> -             sched_enq_called();
> -             if (order > origin_qe->s.order_out) {
> -                     reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
> +     sched_enq_called();
> +     if (order > origin_qe->s.order_out) {
> +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
>  
> -                     /* This enq can't complete until order is restored, so
> -                      * we're done here.
> -                      */
> -                     UNLOCK(&queue->s.lock);
> -                     UNLOCK(&origin_qe->s.lock);
> -                     return 0;
> -             }
> +             /* This enq can't complete until order is restored, so
> +              * we're done here.
> +              */
> +             UNLOCK(&queue->s.lock);
> +             UNLOCK(&origin_qe->s.lock);
> +             return 0;
> +     }
>  
> -             /* We're in order, so account for this and proceed with enq */
> -             if (!sustain) {
> -                     order_release(origin_qe, 1);
> -                     sched_order_resolved(buf_hdr);
> -             }
> +     /* We're in order, so account for this and proceed with enq */
> +     if (!sustain) {
> +             order_release(origin_qe, 1);
> +             sched_order_resolved(buf_hdr);
> +     }
>  
> -             /* if this element is linked, restore the linked chain */
> -             buf_tail = buf_hdr->link;
> +     /* if this element is linked, restore the linked chain */
> +     buf_tail = buf_hdr->link;
>  
> -             if (buf_tail) {
> -                     buf_hdr->next = buf_tail;
> -                     buf_hdr->link = NULL;
> +     if (buf_tail) {
> +             buf_hdr->next = buf_tail;
> +             buf_hdr->link = NULL;
>  
> -                     /* find end of the chain */
> -                     while (buf_tail->next)
> -                             buf_tail = buf_tail->next;
> -             } else {
> -                     buf_tail = buf_hdr;
> -             }
> +             /* find end of the chain */
> +             while (buf_tail->next)
> +                     buf_tail = buf_tail->next;
>       } else {
>               buf_tail = buf_hdr;
>       }
>  
> -     if (!queue->s.head) {
> -             /* Empty queue */
> -             queue->s.head = buf_hdr;
> -             queue->s.tail = buf_tail;
> -             buf_tail->next = NULL;
> -     } else {
> -             queue->s.tail->next = buf_hdr;
> -             queue->s.tail = buf_tail;
> -             buf_tail->next = NULL;
> -     }
> -
> -     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> -             queue->s.status = QUEUE_STATUS_SCHED;
> -             sched = 1; /* retval: schedule queue */
> -     }
> +     sched = _queue_enq_update(queue, buf_hdr, buf_tail);
>  
>       /*
>        * If we came from an ordered queue, check to see if our successful
>        * enq has unblocked other buffers in the origin's reorder queue.
>        */
> -     if (origin_qe) {
> -             odp_buffer_hdr_t *reorder_buf;
> -             odp_buffer_hdr_t *next_buf;
> -             odp_buffer_hdr_t *reorder_prev;
> -             odp_buffer_hdr_t *placeholder_buf;
> -             int               deq_count, release_count, placeholder_count;
> -
> -             deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> -                                     &reorder_prev, &placeholder_buf,
> -                                     &release_count, &placeholder_count);
> -
> -             /* Add released buffers to the queue as well */
> -             if (deq_count > 0) {
> -                     queue->s.tail->next       = origin_qe->s.reorder_head;
> -                     queue->s.tail             = reorder_prev;
> -                     origin_qe->s.reorder_head = reorder_prev->next;
> -                     reorder_prev->next        = NULL;
> -             }
> +     odp_buffer_hdr_t *reorder_buf;
> +     odp_buffer_hdr_t *next_buf;
> +     odp_buffer_hdr_t *reorder_prev;
> +     odp_buffer_hdr_t *placeholder_buf;
> +     int               deq_count, release_count, placeholder_count;
>  
> -             /* Reflect resolved orders in the output sequence */
> -             order_release(origin_qe, release_count + placeholder_count);
> +     deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> +                             &reorder_prev, &placeholder_buf,
> +                             &release_count, &placeholder_count);
>  
> -             /* Now handle any unblocked complete buffers destined for
> -              * other queues, appending placeholder bufs as needed.
> -              */
> -             UNLOCK(&queue->s.lock);
> -             reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
> -             UNLOCK(&origin_qe->s.lock);
> +     /* Add released buffers to the queue as well */
> +     if (deq_count > 0) {
> +             queue->s.tail->next       = origin_qe->s.reorder_head;
> +             queue->s.tail             = reorder_prev;
> +             origin_qe->s.reorder_head = reorder_prev->next;
> +             reorder_prev->next        = NULL;
> +     }
>  
> -             if (reorder_buf)
> -                     queue_enq_internal(reorder_buf);
> +     /* Reflect resolved orders in the output sequence */
> +     order_release(origin_qe, release_count + placeholder_count);
>  
> -             /* Free all placeholder bufs that are now released */
> -             while (placeholder_buf) {
> -                     next_buf = placeholder_buf->next;
> -                     odp_buffer_free(placeholder_buf->handle.handle);
> -                     placeholder_buf = next_buf;
> -             }
> -     } else {
> +     /* Now handle any unblocked complete buffers destined for
> +      * other queues, appending placeholder bufs as needed.
> +      */
> +     UNLOCK(&queue->s.lock);
> +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
> +     UNLOCK(&origin_qe->s.lock);
> +
> +     if (reorder_buf)
> +             queue_enq_internal(reorder_buf);
> +
> +     /* Free all placeholder bufs that are now released */
> +     while (placeholder_buf) {
> +             next_buf = placeholder_buf->next;
> +             odp_buffer_free(placeholder_buf->handle.handle);
> +             placeholder_buf = next_buf;
> +     }
> +
> +     /* Add queue to scheduling */
> +     if (sched && schedule_queue(queue))
> +             ODP_ABORT("schedule_queue failed\n");
> +
> +     return 0;
> +}
> +
> +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
> +{
> +     int sched = 0;
> +     queue_entry_t *origin_qe;
> +     uint64_t order;
> +     odp_buffer_hdr_t *buf_tail;
> +
> +     get_queue_order(&origin_qe, &order, buf_hdr);
> +
> +     if (origin_qe)
> +             return _queue_enq_ordered(queue, buf_hdr, sustain,
> +                                       order, origin_qe);
> +
> +     LOCK(&queue->s.lock);
> +
> +     if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
>               UNLOCK(&queue->s.lock);
> +             ODP_ERR("Bad queue status\n");
> +             return -1;
>       }
>  
> +     buf_tail = buf_hdr;
> +
> +     sched = _queue_enq_update(queue, buf_hdr, buf_tail);
> +
> +     UNLOCK(&queue->s.lock);
> +
>       /* Add queue to scheduling */
>       if (sched && schedule_queue(queue))
>               ODP_ABORT("schedule_queue failed\n");
> _______________________________________________
> lng-odp mailing list
> [email protected]
> https://lists.linaro.org/mailman/listinfo/lng-odp

_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to