Fix a race condition that could cause events to be stuck in an ordered
queue's reorder queue.

Signed-off-by: Bill Fischofer <[email protected]>
---
 .../linux-generic/include/odp_queue_internal.h     | 36 ++++++++++++----
 platform/linux-generic/odp_queue.c                 | 48 +++++++++-------------
 2 files changed, 48 insertions(+), 36 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index f285ea3..48576bc 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -284,18 +284,38 @@ static inline int reorder_deq(queue_entry_t *queue,
        return deq_count;
 }
 
-static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+static inline void reorder_complete(queue_entry_t *origin_qe,
+                                   odp_buffer_hdr_t **reorder_buf_return,
+                                   odp_buffer_hdr_t **placeholder_buf,
+                                   int placeholder_append)
 {
-       odp_buffer_hdr_t *next_buf = reorder_buf->next;
-       uint64_t order = reorder_buf->order;
+       odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+       odp_buffer_hdr_t *next_buf;
+
+       *reorder_buf_return = NULL;
+       if (!placeholder_append)
+               *placeholder_buf = NULL;
 
-       while (reorder_buf->flags.sustain &&
-              next_buf && next_buf->order == order) {
-               reorder_buf = next_buf;
+       while (reorder_buf &&
+              reorder_buf->order <= origin_qe->s.order_out) {
                next_buf = reorder_buf->next;
-       }
 
-       return !reorder_buf->flags.sustain;
+               if (!reorder_buf->target_qe) {
+                       origin_qe->s.reorder_head = next_buf;
+                       reorder_buf->next         = *placeholder_buf;
+                       *placeholder_buf          = reorder_buf;
+
+                       reorder_buf = next_buf;
+                       order_release(origin_qe, 1);
+               } else if (reorder_buf->flags.sustain) {
+                       reorder_buf = next_buf;
+               } else {
+                       *reorder_buf_return = origin_qe->s.reorder_head;
+                       origin_qe->s.reorder_head =
+                               origin_qe->s.reorder_head->next;
+                       break;
+               }
+       }
 }
 
 static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 87483b9..15abd93 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -458,18 +458,10 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr, int sustain)
                order_release(origin_qe, release_count + placeholder_count);
 
                /* Now handle any unblocked complete buffers destined for
-                * other queues.  Note that these must be complete because
-                * otherwise another thread is working on it and is
-                * responsible for resolving order when it is complete.
+                * other queues, appending placeholder bufs as needed.
                 */
                UNLOCK(&queue->s.lock);
-
-               if (reorder_buf &&
-                   reorder_buf->order <= origin_qe->s.order_out &&
-                   reorder_complete(reorder_buf))
-                       origin_qe->s.reorder_head = reorder_buf->next;
-               else
-                       reorder_buf = NULL;
+               reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
                UNLOCK(&origin_qe->s.lock);
 
                if (reorder_buf)
@@ -827,12 +819,7 @@ int queue_pktout_enq(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr,
        order_release(origin_qe, release_count + placeholder_count);
 
        /* Now handle sends to other queues that are ready to go */
-       if (reorder_buf &&
-           reorder_buf->order <= origin_qe->s.order_out &&
-           reorder_complete(reorder_buf))
-               origin_qe->s.reorder_head = reorder_buf->next;
-       else
-               reorder_buf = NULL;
+       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
 
        /* We're fully done with the origin_qe at last */
        UNLOCK(&origin_qe->s.lock);
@@ -910,23 +897,28 @@ int release_order(queue_entry_t *origin_qe, uint64_t 
order,
                order_release(origin_qe, 1);
 
                /* Check if this release allows us to unblock waiters.
-                * Note that we can only process complete waiters since
-                * if the sustain bit is set for a buffer this means that
-                * some other thread is working on it and will be
-                * responsible for resolving order when it is complete.
+                * At the point of this call, the reorder list may contain
+                * zero or more placeholders that need to be freed, followed
+                * by zero or one complete reorder buffer chain.
                 */
-               reorder_buf = origin_qe->s.reorder_head;
-
-               if (reorder_buf &&
-                   reorder_buf->order <= origin_qe->s.order_out &&
-                   reorder_complete(reorder_buf))
-                       origin_qe->s.reorder_head = reorder_buf->next;
-               else
-                       reorder_buf = NULL;
+               reorder_complete(origin_qe, &reorder_buf,
+                                &placeholder_buf_hdr, 0);
 
+               /* Now safe to unlock */
                UNLOCK(&origin_qe->s.lock);
+
+               /* If reorder_buf has a target, do the enq now */
                if (reorder_buf)
                        queue_enq_internal(reorder_buf);
+
+               while (placeholder_buf_hdr) {
+                       odp_buffer_hdr_t *placeholder_next =
+                               placeholder_buf_hdr->next;
+
+                       odp_buffer_free(placeholder_buf_hdr->handle.handle);
+                       placeholder_buf_hdr = placeholder_next;
+               }
+
                return 0;
        }
 
-- 
2.1.4

_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to