The change for comparisons to NULL is to keep checkpatch happy.  I agree
that comparison to NULL is clearer, but that's not the Way We Do It® in the
linux world.

On Thu, Aug 27, 2015 at 9:45 AM, Bill Fischofer <[email protected]>
wrote:

> It's as simple as I could make it.  The semantics of ordering are
> non-trivial.  If you have suggestions for how to simplify, feel free to
> post improvement patches, however there are a *lot* of subtle race
> conditions that can arise if you want to guarantee order preservation.
> It's one of the reasons SoCs have ordering HW.
>
> On Thu, Aug 27, 2015 at 7:15 AM, Savolainen, Petri (Nokia - FI/Espoo) <
> [email protected]> wrote:
>
>>
>> To me the implementation seems unnecessary complex and cannot tell
>> quickly from the code if it works. For example, the new queue_enq() is 5x
>> number of lines compared to the original:
>>
>> int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
>> {
>>         int sched = 0;
>>
>>         LOCK(&queue->s.lock);
>>         if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
>>                 UNLOCK(&queue->s.lock);
>>                 ODP_ERR("Bad queue status\n");
>>                 return -1;
>>         }
>>
>>         if (queue->s.head == NULL) {
>>                 /* Empty queue */
>>                 queue->s.head = buf_hdr;
>>                 queue->s.tail = buf_hdr;
>>                 buf_hdr->next = NULL;
>>         } else {
>>                 queue->s.tail->next = buf_hdr;
>>                 queue->s.tail = buf_hdr;
>>                 buf_hdr->next = NULL;
>>         }
>>
>>         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>>                 queue->s.status = QUEUE_STATUS_SCHED;
>>                 sched = 1; /* retval: schedule queue */
>>         }
>>         UNLOCK(&queue->s.lock);
>>
>>         /* Add queue to scheduling */
>>         if (sched && schedule_queue(queue))
>>                 ODP_ABORT("schedule_queue failed\n");
>>
>>         return 0;
>> }
>>
>> Still, I'm OK to put it in and clean afterwards. We need an
>> implementation to develop and test the validation suite. It would help
>> later development if the ordered queue specific code would be contained
>> into separate functions, and original lines are kept when no real need to
>> change:
>>
>> > -   if (queue->s.head == NULL) {
>> > +     if (!queue->s.head) {
>>
>> To me the original style shows more clearly what is compared.
>>
>>
>> -Petri
>>
>> >
>> >
>> > -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
>> > +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
>> > sustain)
>> >  {
>> >       int sched = 0;
>> > +     queue_entry_t *origin_qe;
>> > +     uint64_t order;
>> > +     odp_buffer_hdr_t *buf_tail;
>> > +
>> > +     get_queue_order(&origin_qe, &order, buf_hdr);
>> > +
>> > +     /* Need two locks for enq operations from ordered queues */
>> > +     if (origin_qe) {
>> > +             LOCK(&origin_qe->s.lock);
>> > +             while (!LOCK_TRY(&queue->s.lock)) {
>> > +                     UNLOCK(&origin_qe->s.lock);
>> > +                     LOCK(&origin_qe->s.lock);
>> > +             }
>> > +             if (odp_unlikely(origin_qe->s.status <
>> QUEUE_STATUS_READY))
>> > {
>> > +                     UNLOCK(&queue->s.lock);
>> > +                     UNLOCK(&origin_qe->s.lock);
>> > +                     ODP_ERR("Bad origin queue status\n");
>> > +                     ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
>> > +                             queue->s.name, origin_qe->s.name,
>> buf_hdr);
>> > +                     return -1;
>> > +             }
>> > +     } else {
>> > +             LOCK(&queue->s.lock);
>> > +     }
>> >
>> > -     LOCK(&queue->s.lock);
>> >       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
>> >               UNLOCK(&queue->s.lock);
>> > +             if (origin_qe)
>> > +                     UNLOCK(&origin_qe->s.lock);
>> >               ODP_ERR("Bad queue status\n");
>> >               return -1;
>> >       }
>> >
>> > -     if (queue->s.head == NULL) {
>> > +     /* We can only complete the enq if we're in order */
>> > +     if (origin_qe) {
>> > +             sched_enq_called();
>> > +             if (order > origin_qe->s.order_out) {
>> > +                     reorder_enq(queue, order, origin_qe, buf_hdr,
>> > sustain);
>> > +
>> > +                     /* This enq can't complete until order is
>> restored,
>> > so
>> > +                      * we're done here.
>> > +                      */
>> > +                     UNLOCK(&queue->s.lock);
>> > +                     UNLOCK(&origin_qe->s.lock);
>> > +                     return 0;
>> > +             }
>> > +
>> > +             /* We're in order, so account for this and proceed with
>> enq
>> > */
>> > +             if (!sustain) {
>> > +                     order_release(origin_qe, 1);
>> > +                     sched_order_resolved(buf_hdr);
>> > +             }
>> > +
>> > +             /* if this element is linked, restore the linked chain */
>> > +             buf_tail = buf_hdr->link;
>> > +
>> > +             if (buf_tail) {
>> > +                     buf_hdr->next = buf_tail;
>> > +                     buf_hdr->link = NULL;
>> > +
>> > +                     /* find end of the chain */
>> > +                     while (buf_tail->next)
>> > +                             buf_tail = buf_tail->next;
>> > +             } else {
>> > +                     buf_tail = buf_hdr;
>> > +             }
>> > +     } else {
>> > +             buf_tail = buf_hdr;
>> > +     }
>> > +
>> > +     if (!queue->s.head) {
>> >               /* Empty queue */
>> >               queue->s.head = buf_hdr;
>> > -             queue->s.tail = buf_hdr;
>> > -             buf_hdr->next = NULL;
>> > +             queue->s.tail = buf_tail;
>> > +             buf_tail->next = NULL;
>> >       } else {
>> >               queue->s.tail->next = buf_hdr;
>> > -             queue->s.tail = buf_hdr;
>> > -             buf_hdr->next = NULL;
>> > +             queue->s.tail = buf_tail;
>> > +             buf_tail->next = NULL;
>> >       }
>> >
>> >       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>> >               queue->s.status = QUEUE_STATUS_SCHED;
>> >               sched = 1; /* retval: schedule queue */
>> >       }
>> > -     UNLOCK(&queue->s.lock);
>> > +
>> > +     /*
>> > +      * If we came from an ordered queue, check to see if our
>> > successful
>> > +      * enq has unblocked other buffers in the origin's reorder queue.
>> > +      */
>> > +     if (origin_qe) {
>> > +             odp_buffer_hdr_t *reorder_buf;
>> > +             odp_buffer_hdr_t *next_buf;
>> > +             odp_buffer_hdr_t *reorder_prev;
>> > +             odp_buffer_hdr_t *placeholder_buf;
>> > +             int               deq_count, release_count,
>> > placeholder_count;
>> > +
>> > +             deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
>> > +                                     &reorder_prev, &placeholder_buf,
>> > +                                     &release_count,
>> &placeholder_count);
>> > +
>> > +             /* Add released buffers to the queue as well */
>> > +             if (deq_count > 0) {
>> > +                     queue->s.tail->next       = origin_qe-
>> > >s.reorder_head;
>> > +                     queue->s.tail             = reorder_prev;
>> > +                     origin_qe->s.reorder_head = reorder_prev->next;
>> > +                     reorder_prev->next        = NULL;
>> > +             }
>> > +
>> > +             /* Reflect resolved orders in the output sequence */
>> > +             order_release(origin_qe, release_count +
>> > placeholder_count);
>> > +
>> > +             /* Now handle any unblocked complete buffers destined for
>> > +              * other queues.  Note that these must be complete because
>> > +              * otherwise another thread is working on it and is
>> > +              * responsible for resolving order when it is complete.
>> > +              */
>> > +             UNLOCK(&queue->s.lock);
>> > +
>> > +             if (reorder_buf &&
>> > +                 reorder_buf->order <= origin_qe->s.order_out &&
>> > +                 reorder_complete(reorder_buf))
>> > +                     origin_qe->s.reorder_head = reorder_buf->next;
>> > +             else
>> > +                     reorder_buf = NULL;
>> > +             UNLOCK(&origin_qe->s.lock);
>> > +
>> > +             if (reorder_buf)
>> > +                     queue_enq_internal(reorder_buf);
>> > +
>> > +             /* Free all placeholder bufs that are now released */
>> > +             while (placeholder_buf) {
>> > +                     next_buf = placeholder_buf->next;
>> > +                     odp_buffer_free(placeholder_buf->handle.handle);
>> > +                     placeholder_buf = next_buf;
>> > +             }
>> > +     } else {
>> > +             UNLOCK(&queue->s.lock);
>> > +     }
>> >
>> >       /* Add queue to scheduling */
>> >       if (sched && schedule_queue(queue))
>> > @@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue,
>> > odp_buffer_hdr_t *buf_hdr)
>> >       return 0;
>> >  }
>>
>
>
_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to