To me the implementation seems unnecessary complex and cannot tell quickly from
the code if it works. For example, the new queue_enq() is 5x number of lines
compared to the original:
int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
{
int sched = 0;
LOCK(&queue->s.lock);
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
UNLOCK(&queue->s.lock);
ODP_ERR("Bad queue status\n");
return -1;
}
if (queue->s.head == NULL) {
/* Empty queue */
queue->s.head = buf_hdr;
queue->s.tail = buf_hdr;
buf_hdr->next = NULL;
} else {
queue->s.tail->next = buf_hdr;
queue->s.tail = buf_hdr;
buf_hdr->next = NULL;
}
if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
queue->s.status = QUEUE_STATUS_SCHED;
sched = 1; /* retval: schedule queue */
}
UNLOCK(&queue->s.lock);
/* Add queue to scheduling */
if (sched && schedule_queue(queue))
ODP_ABORT("schedule_queue failed\n");
return 0;
}
Still, I'm OK to put it in and clean afterwards. We need an implementation to
develop and test the validation suite. It would help later development if the
ordered queue specific code would be contained into separate functions, and
original lines are kept when no real need to change:
> - if (queue->s.head == NULL) {
> + if (!queue->s.head) {
To me the original style shows more clearly what is compared.
-Petri
>
>
> -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
> sustain)
> {
> int sched = 0;
> + queue_entry_t *origin_qe;
> + uint64_t order;
> + odp_buffer_hdr_t *buf_tail;
> +
> + get_queue_order(&origin_qe, &order, buf_hdr);
> +
> + /* Need two locks for enq operations from ordered queues */
> + if (origin_qe) {
> + LOCK(&origin_qe->s.lock);
> + while (!LOCK_TRY(&queue->s.lock)) {
> + UNLOCK(&origin_qe->s.lock);
> + LOCK(&origin_qe->s.lock);
> + }
> + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY))
> {
> + UNLOCK(&queue->s.lock);
> + UNLOCK(&origin_qe->s.lock);
> + ODP_ERR("Bad origin queue status\n");
> + ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> + queue->s.name, origin_qe->s.name, buf_hdr);
> + return -1;
> + }
> + } else {
> + LOCK(&queue->s.lock);
> + }
>
> - LOCK(&queue->s.lock);
> if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> UNLOCK(&queue->s.lock);
> + if (origin_qe)
> + UNLOCK(&origin_qe->s.lock);
> ODP_ERR("Bad queue status\n");
> return -1;
> }
>
> - if (queue->s.head == NULL) {
> + /* We can only complete the enq if we're in order */
> + if (origin_qe) {
> + sched_enq_called();
> + if (order > origin_qe->s.order_out) {
> + reorder_enq(queue, order, origin_qe, buf_hdr,
> sustain);
> +
> + /* This enq can't complete until order is restored,
> so
> + * we're done here.
> + */
> + UNLOCK(&queue->s.lock);
> + UNLOCK(&origin_qe->s.lock);
> + return 0;
> + }
> +
> + /* We're in order, so account for this and proceed with enq
> */
> + if (!sustain) {
> + order_release(origin_qe, 1);
> + sched_order_resolved(buf_hdr);
> + }
> +
> + /* if this element is linked, restore the linked chain */
> + buf_tail = buf_hdr->link;
> +
> + if (buf_tail) {
> + buf_hdr->next = buf_tail;
> + buf_hdr->link = NULL;
> +
> + /* find end of the chain */
> + while (buf_tail->next)
> + buf_tail = buf_tail->next;
> + } else {
> + buf_tail = buf_hdr;
> + }
> + } else {
> + buf_tail = buf_hdr;
> + }
> +
> + if (!queue->s.head) {
> /* Empty queue */
> queue->s.head = buf_hdr;
> - queue->s.tail = buf_hdr;
> - buf_hdr->next = NULL;
> + queue->s.tail = buf_tail;
> + buf_tail->next = NULL;
> } else {
> queue->s.tail->next = buf_hdr;
> - queue->s.tail = buf_hdr;
> - buf_hdr->next = NULL;
> + queue->s.tail = buf_tail;
> + buf_tail->next = NULL;
> }
>
> if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
> queue->s.status = QUEUE_STATUS_SCHED;
> sched = 1; /* retval: schedule queue */
> }
> - UNLOCK(&queue->s.lock);
> +
> + /*
> + * If we came from an ordered queue, check to see if our
> successful
> + * enq has unblocked other buffers in the origin's reorder queue.
> + */
> + if (origin_qe) {
> + odp_buffer_hdr_t *reorder_buf;
> + odp_buffer_hdr_t *next_buf;
> + odp_buffer_hdr_t *reorder_prev;
> + odp_buffer_hdr_t *placeholder_buf;
> + int deq_count, release_count,
> placeholder_count;
> +
> + deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> + &reorder_prev, &placeholder_buf,
> + &release_count, &placeholder_count);
> +
> + /* Add released buffers to the queue as well */
> + if (deq_count > 0) {
> + queue->s.tail->next = origin_qe-
> >s.reorder_head;
> + queue->s.tail = reorder_prev;
> + origin_qe->s.reorder_head = reorder_prev->next;
> + reorder_prev->next = NULL;
> + }
> +
> + /* Reflect resolved orders in the output sequence */
> + order_release(origin_qe, release_count +
> placeholder_count);
> +
> + /* Now handle any unblocked complete buffers destined for
> + * other queues. Note that these must be complete because
> + * otherwise another thread is working on it and is
> + * responsible for resolving order when it is complete.
> + */
> + UNLOCK(&queue->s.lock);
> +
> + if (reorder_buf &&
> + reorder_buf->order <= origin_qe->s.order_out &&
> + reorder_complete(reorder_buf))
> + origin_qe->s.reorder_head = reorder_buf->next;
> + else
> + reorder_buf = NULL;
> + UNLOCK(&origin_qe->s.lock);
> +
> + if (reorder_buf)
> + queue_enq_internal(reorder_buf);
> +
> + /* Free all placeholder bufs that are now released */
> + while (placeholder_buf) {
> + next_buf = placeholder_buf->next;
> + odp_buffer_free(placeholder_buf->handle.handle);
> + placeholder_buf = next_buf;
> + }
> + } else {
> + UNLOCK(&queue->s.lock);
> + }
>
> /* Add queue to scheduling */
> if (sched && schedule_queue(queue))
> @@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr)
> return 0;
> }
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp