It's as simple as I could make it. The semantics of ordering are non-trivial. If you have suggestions for how to simplify, feel free to post improvement patches, however there are a *lot* of subtle race conditions that can arise if you want to guarantee order preservation. It's one of the reasons SoCs have ordering HW.
On Thu, Aug 27, 2015 at 7:15 AM, Savolainen, Petri (Nokia - FI/Espoo) < [email protected]> wrote: > > To me the implementation seems unnecessary complex and cannot tell quickly > from the code if it works. For example, the new queue_enq() is 5x number of > lines compared to the original: > > int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > { > int sched = 0; > > LOCK(&queue->s.lock); > if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > UNLOCK(&queue->s.lock); > ODP_ERR("Bad queue status\n"); > return -1; > } > > if (queue->s.head == NULL) { > /* Empty queue */ > queue->s.head = buf_hdr; > queue->s.tail = buf_hdr; > buf_hdr->next = NULL; > } else { > queue->s.tail->next = buf_hdr; > queue->s.tail = buf_hdr; > buf_hdr->next = NULL; > } > > if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > queue->s.status = QUEUE_STATUS_SCHED; > sched = 1; /* retval: schedule queue */ > } > UNLOCK(&queue->s.lock); > > /* Add queue to scheduling */ > if (sched && schedule_queue(queue)) > ODP_ABORT("schedule_queue failed\n"); > > return 0; > } > > Still, I'm OK to put it in and clean afterwards. We need an implementation > to develop and test the validation suite. It would help later development > if the ordered queue specific code would be contained into separate > functions, and original lines are kept when no real need to change: > > > - if (queue->s.head == NULL) { > > + if (!queue->s.head) { > > To me the original style shows more clearly what is compared. > > > -Petri > > > > > > > -int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > > +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int > > sustain) > > { > > int sched = 0; > > + queue_entry_t *origin_qe; > > + uint64_t order; > > + odp_buffer_hdr_t *buf_tail; > > + > > + get_queue_order(&origin_qe, &order, buf_hdr); > > + > > + /* Need two locks for enq operations from ordered queues */ > > + if (origin_qe) { > > + LOCK(&origin_qe->s.lock); > > + while (!LOCK_TRY(&queue->s.lock)) { > > + UNLOCK(&origin_qe->s.lock); > > + LOCK(&origin_qe->s.lock); > > + } > > + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) > > { > > + UNLOCK(&queue->s.lock); > > + UNLOCK(&origin_qe->s.lock); > > + ODP_ERR("Bad origin queue status\n"); > > + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", > > + queue->s.name, origin_qe->s.name, > buf_hdr); > > + return -1; > > + } > > + } else { > > + LOCK(&queue->s.lock); > > + } > > > > - LOCK(&queue->s.lock); > > if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > > UNLOCK(&queue->s.lock); > > + if (origin_qe) > > + UNLOCK(&origin_qe->s.lock); > > ODP_ERR("Bad queue status\n"); > > return -1; > > } > > > > - if (queue->s.head == NULL) { > > + /* We can only complete the enq if we're in order */ > > + if (origin_qe) { > > + sched_enq_called(); > > + if (order > origin_qe->s.order_out) { > > + reorder_enq(queue, order, origin_qe, buf_hdr, > > sustain); > > + > > + /* This enq can't complete until order is restored, > > so > > + * we're done here. > > + */ > > + UNLOCK(&queue->s.lock); > > + UNLOCK(&origin_qe->s.lock); > > + return 0; > > + } > > + > > + /* We're in order, so account for this and proceed with enq > > */ > > + if (!sustain) { > > + order_release(origin_qe, 1); > > + sched_order_resolved(buf_hdr); > > + } > > + > > + /* if this element is linked, restore the linked chain */ > > + buf_tail = buf_hdr->link; > > + > > + if (buf_tail) { > > + buf_hdr->next = buf_tail; > > + buf_hdr->link = NULL; > > + > > + /* find end of the chain */ > > + while (buf_tail->next) > > + buf_tail = buf_tail->next; > > + } else { > > + buf_tail = buf_hdr; > > + } > > + } else { > > + buf_tail = buf_hdr; > > + } > > + > > + if (!queue->s.head) { > > /* Empty queue */ > > queue->s.head = buf_hdr; > > - queue->s.tail = buf_hdr; > > - buf_hdr->next = NULL; > > + queue->s.tail = buf_tail; > > + buf_tail->next = NULL; > > } else { > > queue->s.tail->next = buf_hdr; > > - queue->s.tail = buf_hdr; > > - buf_hdr->next = NULL; > > + queue->s.tail = buf_tail; > > + buf_tail->next = NULL; > > } > > > > if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > queue->s.status = QUEUE_STATUS_SCHED; > > sched = 1; /* retval: schedule queue */ > > } > > - UNLOCK(&queue->s.lock); > > + > > + /* > > + * If we came from an ordered queue, check to see if our > > successful > > + * enq has unblocked other buffers in the origin's reorder queue. > > + */ > > + if (origin_qe) { > > + odp_buffer_hdr_t *reorder_buf; > > + odp_buffer_hdr_t *next_buf; > > + odp_buffer_hdr_t *reorder_prev; > > + odp_buffer_hdr_t *placeholder_buf; > > + int deq_count, release_count, > > placeholder_count; > > + > > + deq_count = reorder_deq(queue, origin_qe, &reorder_buf, > > + &reorder_prev, &placeholder_buf, > > + &release_count, > &placeholder_count); > > + > > + /* Add released buffers to the queue as well */ > > + if (deq_count > 0) { > > + queue->s.tail->next = origin_qe- > > >s.reorder_head; > > + queue->s.tail = reorder_prev; > > + origin_qe->s.reorder_head = reorder_prev->next; > > + reorder_prev->next = NULL; > > + } > > + > > + /* Reflect resolved orders in the output sequence */ > > + order_release(origin_qe, release_count + > > placeholder_count); > > + > > + /* Now handle any unblocked complete buffers destined for > > + * other queues. Note that these must be complete because > > + * otherwise another thread is working on it and is > > + * responsible for resolving order when it is complete. > > + */ > > + UNLOCK(&queue->s.lock); > > + > > + if (reorder_buf && > > + reorder_buf->order <= origin_qe->s.order_out && > > + reorder_complete(reorder_buf)) > > + origin_qe->s.reorder_head = reorder_buf->next; > > + else > > + reorder_buf = NULL; > > + UNLOCK(&origin_qe->s.lock); > > + > > + if (reorder_buf) > > + queue_enq_internal(reorder_buf); > > + > > + /* Free all placeholder bufs that are now released */ > > + while (placeholder_buf) { > > + next_buf = placeholder_buf->next; > > + odp_buffer_free(placeholder_buf->handle.handle); > > + placeholder_buf = next_buf; > > + } > > + } else { > > + UNLOCK(&queue->s.lock); > > + } > > > > /* Add queue to scheduling */ > > if (sched && schedule_queue(queue)) > > @@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue, > > odp_buffer_hdr_t *buf_hdr) > > return 0; > > } >
_______________________________________________ lng-odp mailing list [email protected] https://lists.linaro.org/mailman/listinfo/lng-odp
