This patch is superseded by patch series http://patches.opendataplane.org/patch/3661/ through http://patches.opendataplane.org/patch/3666/ that resolves Bug https://bugs.linaro.org/show_bug.cgi?id=1879
As part of the restructuring to resolve that patch, ordered queue processing is now handled in a separate routine, accomplishing the same goals as this patch. On Thu, Oct 8, 2015 at 4:10 AM, Nicolas Morey-Chaisemartin <[email protected] > wrote: > Ping > > On 09/10/2015 05:22 PM, Nicolas Morey-Chaisemartin wrote: > > This allows better readability, and make supporting ordered queues > > easier for other platforms > > > > Signed-off-by: Nicolas Morey-Chaisemartin <[email protected]> > > --- > > platform/linux-generic/odp_queue.c | 225 > +++++++++++++++++++++---------------- > > 1 file changed, 127 insertions(+), 98 deletions(-) > > > > diff --git a/platform/linux-generic/odp_queue.c > b/platform/linux-generic/odp_queue.c > > index ac933da..cf8e3ef 100644 > > --- a/platform/linux-generic/odp_queue.c > > +++ b/platform/linux-generic/odp_queue.c > > @@ -341,142 +341,171 @@ odp_queue_t odp_queue_lookup(const char *name) > > return ODP_QUEUE_INVALID; > > } > > > > +/* Update queue head and/or tail and schedule status > > + * Return if the queue needs to be reschedule. > > + * Queue must be locked before calling this function > > + */ > > +static int _queue_enq_update(queue_entry_t *queue, odp_buffer_hdr_t > *head, > > + odp_buffer_hdr_t *tail){ > > + if (!queue->s.head) { > > + /* Empty queue */ > > + queue->s.head = head; > > + queue->s.tail = tail; > > + } else { > > + queue->s.tail->next = head; > > + queue->s.tail = tail; > > + } > > + tail->next = NULL; > > > > -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int > sustain) > > + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > + queue->s.status = QUEUE_STATUS_SCHED; > > + return 1; /* retval: schedule queue */ > > + } > > + return 0; > > +} > > + > > +static int _queue_enq_ordered(queue_entry_t *queue, odp_buffer_hdr_t > *buf_hdr, > > + int sustain, uint64_t order, > > + queue_entry_t *origin_qe) > > { > > int sched = 0; > > - queue_entry_t *origin_qe; > > - uint64_t order; > > odp_buffer_hdr_t *buf_tail; > > > > - get_queue_order(&origin_qe, &order, buf_hdr); > > + LOCK(&origin_qe->s.lock); > > > > /* Need two locks for enq operations from ordered queues */ > > - if (origin_qe) { > > + while (!LOCK_TRY(&queue->s.lock)) { > > + UNLOCK(&origin_qe->s.lock); > > LOCK(&origin_qe->s.lock); > > - while (!LOCK_TRY(&queue->s.lock)) { > > - UNLOCK(&origin_qe->s.lock); > > - LOCK(&origin_qe->s.lock); > > - } > > - if (odp_unlikely(origin_qe->s.status < > QUEUE_STATUS_READY)) { > > - UNLOCK(&queue->s.lock); > > - UNLOCK(&origin_qe->s.lock); > > - ODP_ERR("Bad origin queue status\n"); > > - ODP_ERR("queue = %s, origin q = %s, buf = %p\n", > > - queue->s.name, origin_qe->s.name, > buf_hdr); > > - return -1; > > - } > > - } else { > > - LOCK(&queue->s.lock); > > + } > > + > > + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) { > > + UNLOCK(&queue->s.lock); > > + UNLOCK(&origin_qe->s.lock); > > + ODP_ERR("Bad origin queue status\n"); > > + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", > > + queue->s.name, origin_qe->s.name, buf_hdr); > > + return -1; > > } > > > > if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > > UNLOCK(&queue->s.lock); > > - if (origin_qe) > > - UNLOCK(&origin_qe->s.lock); > > + UNLOCK(&origin_qe->s.lock); > > ODP_ERR("Bad queue status\n"); > > return -1; > > } > > > > /* We can only complete the enq if we're in order */ > > - if (origin_qe) { > > - sched_enq_called(); > > - if (order > origin_qe->s.order_out) { > > - reorder_enq(queue, order, origin_qe, buf_hdr, > sustain); > > + sched_enq_called(); > > + if (order > origin_qe->s.order_out) { > > + reorder_enq(queue, order, origin_qe, buf_hdr, sustain); > > > > - /* This enq can't complete until order is > restored, so > > - * we're done here. > > - */ > > - UNLOCK(&queue->s.lock); > > - UNLOCK(&origin_qe->s.lock); > > - return 0; > > - } > > + /* This enq can't complete until order is restored, so > > + * we're done here. > > + */ > > + UNLOCK(&queue->s.lock); > > + UNLOCK(&origin_qe->s.lock); > > + return 0; > > + } > > > > - /* We're in order, so account for this and proceed with > enq */ > > - if (!sustain) { > > - order_release(origin_qe, 1); > > - sched_order_resolved(buf_hdr); > > - } > > + /* We're in order, so account for this and proceed with enq */ > > + if (!sustain) { > > + order_release(origin_qe, 1); > > + sched_order_resolved(buf_hdr); > > + } > > > > - /* if this element is linked, restore the linked chain */ > > - buf_tail = buf_hdr->link; > > + /* if this element is linked, restore the linked chain */ > > + buf_tail = buf_hdr->link; > > > > - if (buf_tail) { > > - buf_hdr->next = buf_tail; > > - buf_hdr->link = NULL; > > + if (buf_tail) { > > + buf_hdr->next = buf_tail; > > + buf_hdr->link = NULL; > > > > - /* find end of the chain */ > > - while (buf_tail->next) > > - buf_tail = buf_tail->next; > > - } else { > > - buf_tail = buf_hdr; > > - } > > + /* find end of the chain */ > > + while (buf_tail->next) > > + buf_tail = buf_tail->next; > > } else { > > buf_tail = buf_hdr; > > } > > > > - if (!queue->s.head) { > > - /* Empty queue */ > > - queue->s.head = buf_hdr; > > - queue->s.tail = buf_tail; > > - buf_tail->next = NULL; > > - } else { > > - queue->s.tail->next = buf_hdr; > > - queue->s.tail = buf_tail; > > - buf_tail->next = NULL; > > - } > > - > > - if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > - queue->s.status = QUEUE_STATUS_SCHED; > > - sched = 1; /* retval: schedule queue */ > > - } > > + sched = _queue_enq_update(queue, buf_hdr, buf_tail); > > > > /* > > * If we came from an ordered queue, check to see if our successful > > * enq has unblocked other buffers in the origin's reorder queue. > > */ > > - if (origin_qe) { > > - odp_buffer_hdr_t *reorder_buf; > > - odp_buffer_hdr_t *next_buf; > > - odp_buffer_hdr_t *reorder_prev; > > - odp_buffer_hdr_t *placeholder_buf; > > - int deq_count, release_count, > placeholder_count; > > - > > - deq_count = reorder_deq(queue, origin_qe, &reorder_buf, > > - &reorder_prev, &placeholder_buf, > > - &release_count, > &placeholder_count); > > - > > - /* Add released buffers to the queue as well */ > > - if (deq_count > 0) { > > - queue->s.tail->next = > origin_qe->s.reorder_head; > > - queue->s.tail = reorder_prev; > > - origin_qe->s.reorder_head = reorder_prev->next; > > - reorder_prev->next = NULL; > > - } > > + odp_buffer_hdr_t *reorder_buf; > > + odp_buffer_hdr_t *next_buf; > > + odp_buffer_hdr_t *reorder_prev; > > + odp_buffer_hdr_t *placeholder_buf; > > + int deq_count, release_count, placeholder_count; > > > > - /* Reflect resolved orders in the output sequence */ > > - order_release(origin_qe, release_count + > placeholder_count); > > + deq_count = reorder_deq(queue, origin_qe, &reorder_buf, > > + &reorder_prev, &placeholder_buf, > > + &release_count, &placeholder_count); > > > > - /* Now handle any unblocked complete buffers destined for > > - * other queues, appending placeholder bufs as needed. > > - */ > > - UNLOCK(&queue->s.lock); > > - reorder_complete(origin_qe, &reorder_buf, > &placeholder_buf, 1); > > - UNLOCK(&origin_qe->s.lock); > > + /* Add released buffers to the queue as well */ > > + if (deq_count > 0) { > > + queue->s.tail->next = origin_qe->s.reorder_head; > > + queue->s.tail = reorder_prev; > > + origin_qe->s.reorder_head = reorder_prev->next; > > + reorder_prev->next = NULL; > > + } > > > > - if (reorder_buf) > > - queue_enq_internal(reorder_buf); > > + /* Reflect resolved orders in the output sequence */ > > + order_release(origin_qe, release_count + placeholder_count); > > > > - /* Free all placeholder bufs that are now released */ > > - while (placeholder_buf) { > > - next_buf = placeholder_buf->next; > > - odp_buffer_free(placeholder_buf->handle.handle); > > - placeholder_buf = next_buf; > > - } > > - } else { > > + /* Now handle any unblocked complete buffers destined for > > + * other queues, appending placeholder bufs as needed. > > + */ > > + UNLOCK(&queue->s.lock); > > + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1); > > + UNLOCK(&origin_qe->s.lock); > > + > > + if (reorder_buf) > > + queue_enq_internal(reorder_buf); > > + > > + /* Free all placeholder bufs that are now released */ > > + while (placeholder_buf) { > > + next_buf = placeholder_buf->next; > > + odp_buffer_free(placeholder_buf->handle.handle); > > + placeholder_buf = next_buf; > > + } > > + > > + /* Add queue to scheduling */ > > + if (sched && schedule_queue(queue)) > > + ODP_ABORT("schedule_queue failed\n"); > > + > > + return 0; > > +} > > + > > +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int > sustain) > > +{ > > + int sched = 0; > > + queue_entry_t *origin_qe; > > + uint64_t order; > > + odp_buffer_hdr_t *buf_tail; > > + > > + get_queue_order(&origin_qe, &order, buf_hdr); > > + > > + if (origin_qe) > > + return _queue_enq_ordered(queue, buf_hdr, sustain, > > + order, origin_qe); > > + > > + LOCK(&queue->s.lock); > > + > > + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > > UNLOCK(&queue->s.lock); > > + ODP_ERR("Bad queue status\n"); > > + return -1; > > } > > > > + buf_tail = buf_hdr; > > + > > + sched = _queue_enq_update(queue, buf_hdr, buf_tail); > > + > > + UNLOCK(&queue->s.lock); > > + > > /* Add queue to scheduling */ > > if (sched && schedule_queue(queue)) > > ODP_ABORT("schedule_queue failed\n"); > > _______________________________________________ > > lng-odp mailing list > > [email protected] > > https://lists.linaro.org/mailman/listinfo/lng-odp > > _______________________________________________ > lng-odp mailing list > [email protected] > https://lists.linaro.org/mailman/listinfo/lng-odp >
_______________________________________________ lng-odp mailing list [email protected] https://lists.linaro.org/mailman/listinfo/lng-odp
