Added multi-data versions of ring enqueue and dequeue operations.
Signed-off-by: Petri Savolainen <[email protected]>
---
platform/linux-generic/include/odp_ring_internal.h | 65 ++++++++++++++++++++++
1 file changed, 65 insertions(+)
diff --git a/platform/linux-generic/include/odp_ring_internal.h
b/platform/linux-generic/include/odp_ring_internal.h
index 6a6291a..55fedeb 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -80,6 +80,45 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
return data;
}
+/* Dequeue multiple data from the ring head. Num is smaller than ring size. */
+static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
+ uint32_t data[], uint32_t num)
+{
+ uint32_t head, tail, new_head, i;
+
+ head = odp_atomic_load_u32(&ring->r_head);
+
+ /* Move reader head. This thread owns data at the new head. */
+ do {
+ tail = odp_atomic_load_u32(&ring->w_tail);
+
+ /* Ring is empty */
+ if (head == tail)
+ return 0;
+
+ /* Try to take all available */
+ if ((tail - head) < num)
+ num = tail - head;
+
+ new_head = head + num;
+
+ } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+ new_head) == 0));
+
+ /* Read queue index */
+ for (i = 0; i < num; i++)
+ data[i] = ring->data[(head + 1 + i) & mask];
+
+ /* Wait until other readers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+ odp_cpu_pause();
+
+ /* Now update the reader tail */
+ odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+ return num;
+}
+
/* Enqueue data into the ring tail */
static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
{
@@ -104,6 +143,32 @@ static inline void ring_enq(ring_t *ring, uint32_t mask,
uint32_t data)
odp_atomic_store_rel_u32(&ring->w_tail, new_head);
}
+/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
+static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
+ uint32_t num)
+{
+ uint32_t old_head, new_head, i;
+
+ /* Reserve a slot in the ring for writing */
+ old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
+ new_head = old_head + 1;
+
+ /* Ring is full. Wait for the last reader to finish. */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+ odp_cpu_pause();
+
+ /* Write data */
+ for (i = 0; i < num; i++)
+ ring->data[(new_head + i) & mask] = data[i];
+
+ /* Wait until other writers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+ odp_cpu_pause();
+
+ /* Now update the writer tail */
+ odp_atomic_store_rel_u32(&ring->w_tail, old_head + num);
+}
+
#ifdef __cplusplus
}
#endif
--
2.8.1