The change for comparisons to NULL is to keep checkpatch happy. I agree that comparison to NULL is clearer, but that's not the Way We Do It® in the linux world.
On Thu, Aug 27, 2015 at 9:45 AM, Bill Fischofer <[email protected]> wrote: > It's as simple as I could make it. The semantics of ordering are > non-trivial. If you have suggestions for how to simplify, feel free to > post improvement patches, however there are a *lot* of subtle race > conditions that can arise if you want to guarantee order preservation. > It's one of the reasons SoCs have ordering HW. > > On Thu, Aug 27, 2015 at 7:15 AM, Savolainen, Petri (Nokia - FI/Espoo) < > [email protected]> wrote: > >> >> To me the implementation seems unnecessary complex and cannot tell >> quickly from the code if it works. For example, the new queue_enq() is 5x >> number of lines compared to the original: >> >> int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) >> { >> int sched = 0; >> >> LOCK(&queue->s.lock); >> if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { >> UNLOCK(&queue->s.lock); >> ODP_ERR("Bad queue status\n"); >> return -1; >> } >> >> if (queue->s.head == NULL) { >> /* Empty queue */ >> queue->s.head = buf_hdr; >> queue->s.tail = buf_hdr; >> buf_hdr->next = NULL; >> } else { >> queue->s.tail->next = buf_hdr; >> queue->s.tail = buf_hdr; >> buf_hdr->next = NULL; >> } >> >> if (queue->s.status == QUEUE_STATUS_NOTSCHED) { >> queue->s.status = QUEUE_STATUS_SCHED; >> sched = 1; /* retval: schedule queue */ >> } >> UNLOCK(&queue->s.lock); >> >> /* Add queue to scheduling */ >> if (sched && schedule_queue(queue)) >> ODP_ABORT("schedule_queue failed\n"); >> >> return 0; >> } >> >> Still, I'm OK to put it in and clean afterwards. We need an >> implementation to develop and test the validation suite. It would help >> later development if the ordered queue specific code would be contained >> into separate functions, and original lines are kept when no real need to >> change: >> >> > - if (queue->s.head == NULL) { >> > + if (!queue->s.head) { >> >> To me the original style shows more clearly what is compared. >> >> >> -Petri >> >> > >> > >> > -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) >> > +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int >> > sustain) >> > { >> > int sched = 0; >> > + queue_entry_t *origin_qe; >> > + uint64_t order; >> > + odp_buffer_hdr_t *buf_tail; >> > + >> > + get_queue_order(&origin_qe, &order, buf_hdr); >> > + >> > + /* Need two locks for enq operations from ordered queues */ >> > + if (origin_qe) { >> > + LOCK(&origin_qe->s.lock); >> > + while (!LOCK_TRY(&queue->s.lock)) { >> > + UNLOCK(&origin_qe->s.lock); >> > + LOCK(&origin_qe->s.lock); >> > + } >> > + if (odp_unlikely(origin_qe->s.status < >> QUEUE_STATUS_READY)) >> > { >> > + UNLOCK(&queue->s.lock); >> > + UNLOCK(&origin_qe->s.lock); >> > + ODP_ERR("Bad origin queue status\n"); >> > + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", >> > + queue->s.name, origin_qe->s.name, >> buf_hdr); >> > + return -1; >> > + } >> > + } else { >> > + LOCK(&queue->s.lock); >> > + } >> > >> > - LOCK(&queue->s.lock); >> > if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { >> > UNLOCK(&queue->s.lock); >> > + if (origin_qe) >> > + UNLOCK(&origin_qe->s.lock); >> > ODP_ERR("Bad queue status\n"); >> > return -1; >> > } >> > >> > - if (queue->s.head == NULL) { >> > + /* We can only complete the enq if we're in order */ >> > + if (origin_qe) { >> > + sched_enq_called(); >> > + if (order > origin_qe->s.order_out) { >> > + reorder_enq(queue, order, origin_qe, buf_hdr, >> > sustain); >> > + >> > + /* This enq can't complete until order is >> restored, >> > so >> > + * we're done here. >> > + */ >> > + UNLOCK(&queue->s.lock); >> > + UNLOCK(&origin_qe->s.lock); >> > + return 0; >> > + } >> > + >> > + /* We're in order, so account for this and proceed with >> enq >> > */ >> > + if (!sustain) { >> > + order_release(origin_qe, 1); >> > + sched_order_resolved(buf_hdr); >> > + } >> > + >> > + /* if this element is linked, restore the linked chain */ >> > + buf_tail = buf_hdr->link; >> > + >> > + if (buf_tail) { >> > + buf_hdr->next = buf_tail; >> > + buf_hdr->link = NULL; >> > + >> > + /* find end of the chain */ >> > + while (buf_tail->next) >> > + buf_tail = buf_tail->next; >> > + } else { >> > + buf_tail = buf_hdr; >> > + } >> > + } else { >> > + buf_tail = buf_hdr; >> > + } >> > + >> > + if (!queue->s.head) { >> > /* Empty queue */ >> > queue->s.head = buf_hdr; >> > - queue->s.tail = buf_hdr; >> > - buf_hdr->next = NULL; >> > + queue->s.tail = buf_tail; >> > + buf_tail->next = NULL; >> > } else { >> > queue->s.tail->next = buf_hdr; >> > - queue->s.tail = buf_hdr; >> > - buf_hdr->next = NULL; >> > + queue->s.tail = buf_tail; >> > + buf_tail->next = NULL; >> > } >> > >> > if (queue->s.status == QUEUE_STATUS_NOTSCHED) { >> > queue->s.status = QUEUE_STATUS_SCHED; >> > sched = 1; /* retval: schedule queue */ >> > } >> > - UNLOCK(&queue->s.lock); >> > + >> > + /* >> > + * If we came from an ordered queue, check to see if our >> > successful >> > + * enq has unblocked other buffers in the origin's reorder queue. >> > + */ >> > + if (origin_qe) { >> > + odp_buffer_hdr_t *reorder_buf; >> > + odp_buffer_hdr_t *next_buf; >> > + odp_buffer_hdr_t *reorder_prev; >> > + odp_buffer_hdr_t *placeholder_buf; >> > + int deq_count, release_count, >> > placeholder_count; >> > + >> > + deq_count = reorder_deq(queue, origin_qe, &reorder_buf, >> > + &reorder_prev, &placeholder_buf, >> > + &release_count, >> &placeholder_count); >> > + >> > + /* Add released buffers to the queue as well */ >> > + if (deq_count > 0) { >> > + queue->s.tail->next = origin_qe- >> > >s.reorder_head; >> > + queue->s.tail = reorder_prev; >> > + origin_qe->s.reorder_head = reorder_prev->next; >> > + reorder_prev->next = NULL; >> > + } >> > + >> > + /* Reflect resolved orders in the output sequence */ >> > + order_release(origin_qe, release_count + >> > placeholder_count); >> > + >> > + /* Now handle any unblocked complete buffers destined for >> > + * other queues. Note that these must be complete because >> > + * otherwise another thread is working on it and is >> > + * responsible for resolving order when it is complete. >> > + */ >> > + UNLOCK(&queue->s.lock); >> > + >> > + if (reorder_buf && >> > + reorder_buf->order <= origin_qe->s.order_out && >> > + reorder_complete(reorder_buf)) >> > + origin_qe->s.reorder_head = reorder_buf->next; >> > + else >> > + reorder_buf = NULL; >> > + UNLOCK(&origin_qe->s.lock); >> > + >> > + if (reorder_buf) >> > + queue_enq_internal(reorder_buf); >> > + >> > + /* Free all placeholder bufs that are now released */ >> > + while (placeholder_buf) { >> > + next_buf = placeholder_buf->next; >> > + odp_buffer_free(placeholder_buf->handle.handle); >> > + placeholder_buf = next_buf; >> > + } >> > + } else { >> > + UNLOCK(&queue->s.lock); >> > + } >> > >> > /* Add queue to scheduling */ >> > if (sched && schedule_queue(queue)) >> > @@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue, >> > odp_buffer_hdr_t *buf_hdr) >> > return 0; >> > } >> > >
_______________________________________________ lng-odp mailing list [email protected] https://lists.linaro.org/mailman/listinfo/lng-odp
