To me the implementation seems unnecessary complex and cannot tell quickly from 
the code if it works. For example, the new queue_enq() is 5x number of lines 
compared to the original: 

int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
{
        int sched = 0;

        LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
                ODP_ERR("Bad queue status\n");
                return -1;
        }

        if (queue->s.head == NULL) {
                /* Empty queue */
                queue->s.head = buf_hdr;
                queue->s.tail = buf_hdr;
                buf_hdr->next = NULL;
        } else {
                queue->s.tail->next = buf_hdr;
                queue->s.tail = buf_hdr;
                buf_hdr->next = NULL;
        }

        if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
                queue->s.status = QUEUE_STATUS_SCHED;
                sched = 1; /* retval: schedule queue */
        }
        UNLOCK(&queue->s.lock);

        /* Add queue to scheduling */
        if (sched && schedule_queue(queue))
                ODP_ABORT("schedule_queue failed\n");

        return 0;
}

Still, I'm OK to put it in and clean afterwards. We need an implementation to 
develop and test the validation suite. It would help later development if the 
ordered queue specific code would be contained into separate functions, and 
original lines are kept when no real need to change:

> -   if (queue->s.head == NULL) {
> +     if (!queue->s.head) {

To me the original style shows more clearly what is compared.


-Petri

> 
> 
> -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
> sustain)
>  {
>       int sched = 0;
> +     queue_entry_t *origin_qe;
> +     uint64_t order;
> +     odp_buffer_hdr_t *buf_tail;
> +
> +     get_queue_order(&origin_qe, &order, buf_hdr);
> +
> +     /* Need two locks for enq operations from ordered queues */
> +     if (origin_qe) {
> +             LOCK(&origin_qe->s.lock);
> +             while (!LOCK_TRY(&queue->s.lock)) {
> +                     UNLOCK(&origin_qe->s.lock);
> +                     LOCK(&origin_qe->s.lock);
> +             }
> +             if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY))
> {
> +                     UNLOCK(&queue->s.lock);
> +                     UNLOCK(&origin_qe->s.lock);
> +                     ODP_ERR("Bad origin queue status\n");
> +                     ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
> +                             queue->s.name, origin_qe->s.name, buf_hdr);
> +                     return -1;
> +             }
> +     } else {
> +             LOCK(&queue->s.lock);
> +     }
> 
> -     LOCK(&queue->s.lock);
>       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
>               UNLOCK(&queue->s.lock);
> +             if (origin_qe)
> +                     UNLOCK(&origin_qe->s.lock);
>               ODP_ERR("Bad queue status\n");
>               return -1;
>       }
> 
> -     if (queue->s.head == NULL) {
> +     /* We can only complete the enq if we're in order */
> +     if (origin_qe) {
> +             sched_enq_called();
> +             if (order > origin_qe->s.order_out) {
> +                     reorder_enq(queue, order, origin_qe, buf_hdr,
> sustain);
> +
> +                     /* This enq can't complete until order is restored,
> so
> +                      * we're done here.
> +                      */
> +                     UNLOCK(&queue->s.lock);
> +                     UNLOCK(&origin_qe->s.lock);
> +                     return 0;
> +             }
> +
> +             /* We're in order, so account for this and proceed with enq
> */
> +             if (!sustain) {
> +                     order_release(origin_qe, 1);
> +                     sched_order_resolved(buf_hdr);
> +             }
> +
> +             /* if this element is linked, restore the linked chain */
> +             buf_tail = buf_hdr->link;
> +
> +             if (buf_tail) {
> +                     buf_hdr->next = buf_tail;
> +                     buf_hdr->link = NULL;
> +
> +                     /* find end of the chain */
> +                     while (buf_tail->next)
> +                             buf_tail = buf_tail->next;
> +             } else {
> +                     buf_tail = buf_hdr;
> +             }
> +     } else {
> +             buf_tail = buf_hdr;
> +     }
> +
> +     if (!queue->s.head) {
>               /* Empty queue */
>               queue->s.head = buf_hdr;
> -             queue->s.tail = buf_hdr;
> -             buf_hdr->next = NULL;
> +             queue->s.tail = buf_tail;
> +             buf_tail->next = NULL;
>       } else {
>               queue->s.tail->next = buf_hdr;
> -             queue->s.tail = buf_hdr;
> -             buf_hdr->next = NULL;
> +             queue->s.tail = buf_tail;
> +             buf_tail->next = NULL;
>       }
> 
>       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>               queue->s.status = QUEUE_STATUS_SCHED;
>               sched = 1; /* retval: schedule queue */
>       }
> -     UNLOCK(&queue->s.lock);
> +
> +     /*
> +      * If we came from an ordered queue, check to see if our
> successful
> +      * enq has unblocked other buffers in the origin's reorder queue.
> +      */
> +     if (origin_qe) {
> +             odp_buffer_hdr_t *reorder_buf;
> +             odp_buffer_hdr_t *next_buf;
> +             odp_buffer_hdr_t *reorder_prev;
> +             odp_buffer_hdr_t *placeholder_buf;
> +             int               deq_count, release_count,
> placeholder_count;
> +
> +             deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
> +                                     &reorder_prev, &placeholder_buf,
> +                                     &release_count, &placeholder_count);
> +
> +             /* Add released buffers to the queue as well */
> +             if (deq_count > 0) {
> +                     queue->s.tail->next       = origin_qe-
> >s.reorder_head;
> +                     queue->s.tail             = reorder_prev;
> +                     origin_qe->s.reorder_head = reorder_prev->next;
> +                     reorder_prev->next        = NULL;
> +             }
> +
> +             /* Reflect resolved orders in the output sequence */
> +             order_release(origin_qe, release_count +
> placeholder_count);
> +
> +             /* Now handle any unblocked complete buffers destined for
> +              * other queues.  Note that these must be complete because
> +              * otherwise another thread is working on it and is
> +              * responsible for resolving order when it is complete.
> +              */
> +             UNLOCK(&queue->s.lock);
> +
> +             if (reorder_buf &&
> +                 reorder_buf->order <= origin_qe->s.order_out &&
> +                 reorder_complete(reorder_buf))
> +                     origin_qe->s.reorder_head = reorder_buf->next;
> +             else
> +                     reorder_buf = NULL;
> +             UNLOCK(&origin_qe->s.lock);
> +
> +             if (reorder_buf)
> +                     queue_enq_internal(reorder_buf);
> +
> +             /* Free all placeholder bufs that are now released */
> +             while (placeholder_buf) {
> +                     next_buf = placeholder_buf->next;
> +                     odp_buffer_free(placeholder_buf->handle.handle);
> +                     placeholder_buf = next_buf;
> +             }
> +     } else {
> +             UNLOCK(&queue->s.lock);
> +     }
> 
>       /* Add queue to scheduling */
>       if (sched && schedule_queue(queue))
> @@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr)
>       return 0;
>  }
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to