This patch is superseded by patch series
http://patches.opendataplane.org/patch/3661/ through
http://patches.opendataplane.org/patch/3666/ that resolves Bug
https://bugs.linaro.org/show_bug.cgi?id=1879

As part of the restructuring to resolve that patch, ordered queue
processing is now handled in a separate routine, accomplishing the same
goals as this patch.

On Thu, Oct 8, 2015 at 4:10 AM, Nicolas Morey-Chaisemartin <[email protected]
> wrote:

> Ping
>
> On 09/10/2015 05:22 PM, Nicolas Morey-Chaisemartin wrote:
> > This allows better readability, and make supporting ordered queues
> >  easier for other platforms
> >
> > Signed-off-by: Nicolas Morey-Chaisemartin <[email protected]>
> > ---
> >  platform/linux-generic/odp_queue.c | 225
> +++++++++++++++++++++----------------
> >  1 file changed, 127 insertions(+), 98 deletions(-)
> >
> > diff --git a/platform/linux-generic/odp_queue.c
> b/platform/linux-generic/odp_queue.c
> > index ac933da..cf8e3ef 100644
> > --- a/platform/linux-generic/odp_queue.c
> > +++ b/platform/linux-generic/odp_queue.c
> > @@ -341,142 +341,171 @@ odp_queue_t odp_queue_lookup(const char *name)
> >       return ODP_QUEUE_INVALID;
> >  }
> >
> > +/* Update queue head and/or tail and schedule status
> > + * Return if the queue needs to be reschedule.
> > + * Queue must be locked before calling this function
> > + */
> > +static int _queue_enq_update(queue_entry_t *queue, odp_buffer_hdr_t
> *head,
> > +                          odp_buffer_hdr_t *tail){
> > +     if (!queue->s.head) {
> > +             /* Empty queue */
> > +             queue->s.head = head;
> > +             queue->s.tail = tail;
> > +     } else {
> > +             queue->s.tail->next = head;
> > +             queue->s.tail = tail;
> > +     }
> > +     tail->next = NULL;
> >
> > -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
> sustain)
> > +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> > +             queue->s.status = QUEUE_STATUS_SCHED;
> > +             return  1; /* retval: schedule queue */
> > +     }
> > +     return 0;
> > +}
> > +
> > +static int _queue_enq_ordered(queue_entry_t *queue, odp_buffer_hdr_t
> *buf_hdr,
> > +                           int sustain, uint64_t order,
> > +                           queue_entry_t *origin_qe)
> >  {
> >       int sched = 0;
> > -     queue_entry_t *origin_qe;
> > -     uint64_t order;
> >       odp_buffer_hdr_t *buf_tail;
> >
> > -     get_queue_order(&origin_qe, &order, buf_hdr);
> > +     LOCK(&origin_qe->s.lock);
> >
> >       /* Need two locks for enq operations from ordered queues */
> > -     if (origin_qe) {
> > +     while (!LOCK_TRY(&queue->s.lock)) {
> > +             UNLOCK(&origin_qe->s.lock);
> >               LOCK(&origin_qe->s.lock);
> > -             while (!LOCK_TRY(&queue->s.lock)) {
> > -                     UNLOCK(&origin_qe->s.lock);
> > -                     LOCK(&origin_qe->s.lock);
> > -             }
> > -             if (odp_unlikely(origin_qe->s.status <
> QUEUE_STATUS_READY)) {
> > -                     UNLOCK(&queue->s.lock);
> > -                     UNLOCK(&origin_qe->s.lock);
> > -                     ODP_ERR("Bad origin queue status\n");
> > -                     ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> > -                             queue->s.name, origin_qe->s.name,
> buf_hdr);
> > -                     return -1;
> > -             }
> > -     } else {
> > -             LOCK(&queue->s.lock);
> > +     }
> > +
> > +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
> > +             UNLOCK(&queue->s.lock);
> > +             UNLOCK(&origin_qe->s.lock);
> > +             ODP_ERR("Bad origin queue status\n");
> > +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> > +                     queue->s.name, origin_qe->s.name, buf_hdr);
> > +             return -1;
> >       }
> >
> >       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> >               UNLOCK(&queue->s.lock);
> > -             if (origin_qe)
> > -                     UNLOCK(&origin_qe->s.lock);
> > +             UNLOCK(&origin_qe->s.lock);
> >               ODP_ERR("Bad queue status\n");
> >               return -1;
> >       }
> >
> >       /* We can only complete the enq if we're in order */
> > -     if (origin_qe) {
> > -             sched_enq_called();
> > -             if (order > origin_qe->s.order_out) {
> > -                     reorder_enq(queue, order, origin_qe, buf_hdr,
> sustain);
> > +     sched_enq_called();
> > +     if (order > origin_qe->s.order_out) {
> > +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
> >
> > -                     /* This enq can't complete until order is
> restored, so
> > -                      * we're done here.
> > -                      */
> > -                     UNLOCK(&queue->s.lock);
> > -                     UNLOCK(&origin_qe->s.lock);
> > -                     return 0;
> > -             }
> > +             /* This enq can't complete until order is restored, so
> > +              * we're done here.
> > +              */
> > +             UNLOCK(&queue->s.lock);
> > +             UNLOCK(&origin_qe->s.lock);
> > +             return 0;
> > +     }
> >
> > -             /* We're in order, so account for this and proceed with
> enq */
> > -             if (!sustain) {
> > -                     order_release(origin_qe, 1);
> > -                     sched_order_resolved(buf_hdr);
> > -             }
> > +     /* We're in order, so account for this and proceed with enq */
> > +     if (!sustain) {
> > +             order_release(origin_qe, 1);
> > +             sched_order_resolved(buf_hdr);
> > +     }
> >
> > -             /* if this element is linked, restore the linked chain */
> > -             buf_tail = buf_hdr->link;
> > +     /* if this element is linked, restore the linked chain */
> > +     buf_tail = buf_hdr->link;
> >
> > -             if (buf_tail) {
> > -                     buf_hdr->next = buf_tail;
> > -                     buf_hdr->link = NULL;
> > +     if (buf_tail) {
> > +             buf_hdr->next = buf_tail;
> > +             buf_hdr->link = NULL;
> >
> > -                     /* find end of the chain */
> > -                     while (buf_tail->next)
> > -                             buf_tail = buf_tail->next;
> > -             } else {
> > -                     buf_tail = buf_hdr;
> > -             }
> > +             /* find end of the chain */
> > +             while (buf_tail->next)
> > +                     buf_tail = buf_tail->next;
> >       } else {
> >               buf_tail = buf_hdr;
> >       }
> >
> > -     if (!queue->s.head) {
> > -             /* Empty queue */
> > -             queue->s.head = buf_hdr;
> > -             queue->s.tail = buf_tail;
> > -             buf_tail->next = NULL;
> > -     } else {
> > -             queue->s.tail->next = buf_hdr;
> > -             queue->s.tail = buf_tail;
> > -             buf_tail->next = NULL;
> > -     }
> > -
> > -     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> > -             queue->s.status = QUEUE_STATUS_SCHED;
> > -             sched = 1; /* retval: schedule queue */
> > -     }
> > +     sched = _queue_enq_update(queue, buf_hdr, buf_tail);
> >
> >       /*
> >        * If we came from an ordered queue, check to see if our successful
> >        * enq has unblocked other buffers in the origin's reorder queue.
> >        */
> > -     if (origin_qe) {
> > -             odp_buffer_hdr_t *reorder_buf;
> > -             odp_buffer_hdr_t *next_buf;
> > -             odp_buffer_hdr_t *reorder_prev;
> > -             odp_buffer_hdr_t *placeholder_buf;
> > -             int               deq_count, release_count,
> placeholder_count;
> > -
> > -             deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> > -                                     &reorder_prev, &placeholder_buf,
> > -                                     &release_count,
> &placeholder_count);
> > -
> > -             /* Add released buffers to the queue as well */
> > -             if (deq_count > 0) {
> > -                     queue->s.tail->next       =
> origin_qe->s.reorder_head;
> > -                     queue->s.tail             = reorder_prev;
> > -                     origin_qe->s.reorder_head = reorder_prev->next;
> > -                     reorder_prev->next        = NULL;
> > -             }
> > +     odp_buffer_hdr_t *reorder_buf;
> > +     odp_buffer_hdr_t *next_buf;
> > +     odp_buffer_hdr_t *reorder_prev;
> > +     odp_buffer_hdr_t *placeholder_buf;
> > +     int               deq_count, release_count, placeholder_count;
> >
> > -             /* Reflect resolved orders in the output sequence */
> > -             order_release(origin_qe, release_count +
> placeholder_count);
> > +     deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> > +                             &reorder_prev, &placeholder_buf,
> > +                             &release_count, &placeholder_count);
> >
> > -             /* Now handle any unblocked complete buffers destined for
> > -              * other queues, appending placeholder bufs as needed.
> > -              */
> > -             UNLOCK(&queue->s.lock);
> > -             reorder_complete(origin_qe, &reorder_buf,
> &placeholder_buf, 1);
> > -             UNLOCK(&origin_qe->s.lock);
> > +     /* Add released buffers to the queue as well */
> > +     if (deq_count > 0) {
> > +             queue->s.tail->next       = origin_qe->s.reorder_head;
> > +             queue->s.tail             = reorder_prev;
> > +             origin_qe->s.reorder_head = reorder_prev->next;
> > +             reorder_prev->next        = NULL;
> > +     }
> >
> > -             if (reorder_buf)
> > -                     queue_enq_internal(reorder_buf);
> > +     /* Reflect resolved orders in the output sequence */
> > +     order_release(origin_qe, release_count + placeholder_count);
> >
> > -             /* Free all placeholder bufs that are now released */
> > -             while (placeholder_buf) {
> > -                     next_buf = placeholder_buf->next;
> > -                     odp_buffer_free(placeholder_buf->handle.handle);
> > -                     placeholder_buf = next_buf;
> > -             }
> > -     } else {
> > +     /* Now handle any unblocked complete buffers destined for
> > +      * other queues, appending placeholder bufs as needed.
> > +      */
> > +     UNLOCK(&queue->s.lock);
> > +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
> > +     UNLOCK(&origin_qe->s.lock);
> > +
> > +     if (reorder_buf)
> > +             queue_enq_internal(reorder_buf);
> > +
> > +     /* Free all placeholder bufs that are now released */
> > +     while (placeholder_buf) {
> > +             next_buf = placeholder_buf->next;
> > +             odp_buffer_free(placeholder_buf->handle.handle);
> > +             placeholder_buf = next_buf;
> > +     }
> > +
> > +     /* Add queue to scheduling */
> > +     if (sched && schedule_queue(queue))
> > +             ODP_ABORT("schedule_queue failed\n");
> > +
> > +     return 0;
> > +}
> > +
> > +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
> sustain)
> > +{
> > +     int sched = 0;
> > +     queue_entry_t *origin_qe;
> > +     uint64_t order;
> > +     odp_buffer_hdr_t *buf_tail;
> > +
> > +     get_queue_order(&origin_qe, &order, buf_hdr);
> > +
> > +     if (origin_qe)
> > +             return _queue_enq_ordered(queue, buf_hdr, sustain,
> > +                                       order, origin_qe);
> > +
> > +     LOCK(&queue->s.lock);
> > +
> > +     if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> >               UNLOCK(&queue->s.lock);
> > +             ODP_ERR("Bad queue status\n");
> > +             return -1;
> >       }
> >
> > +     buf_tail = buf_hdr;
> > +
> > +     sched = _queue_enq_update(queue, buf_hdr, buf_tail);
> > +
> > +     UNLOCK(&queue->s.lock);
> > +
> >       /* Add queue to scheduling */
> >       if (sched && schedule_queue(queue))
> >               ODP_ABORT("schedule_queue failed\n");
> > _______________________________________________
> > lng-odp mailing list
> > [email protected]
> > https://lists.linaro.org/mailman/listinfo/lng-odp
>
> _______________________________________________
> lng-odp mailing list
> [email protected]
> https://lists.linaro.org/mailman/listinfo/lng-odp
>
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to