If application link one atomic queue to multiple ports,
and each worker core update flow_id, there will have a
chance to hit race condition issue and lead to double processing
same event. This fix solve the problem and eliminate
the race condition issue.
Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library")
Signed-off-by: Liang Ma
Signed-off-by: Peter Mccarthy
---
drivers/event/opdl/opdl_evdev_init.c | 3 ++
drivers/event/opdl/opdl_ring.c | 95 +---
drivers/event/opdl/opdl_ring.h | 16 +-
3 files changed, 85 insertions(+), 29 deletions(-)
diff --git a/drivers/event/opdl/opdl_evdev_init.c
b/drivers/event/opdl/opdl_evdev_init.c
index 1454de5..582ad69 100644
--- a/drivers/event/opdl/opdl_evdev_init.c
+++ b/drivers/event/opdl/opdl_evdev_init.c
@@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
queue->ports[queue->nb_ports] = port;
port->instance_id = queue->nb_ports;
queue->nb_ports++;
+ opdl_stage_set_queue_id(stage_inst,
+ port->queue_id);
+
} else if (queue->q_pos == OPDL_Q_POS_END) {
/* tx port */
diff --git a/drivers/event/opdl/opdl_ring.c b/drivers/event/opdl/opdl_ring.c
index eca7712..0f40d31 100644
--- a/drivers/event/opdl/opdl_ring.c
+++ b/drivers/event/opdl/opdl_ring.c
@@ -25,7 +25,10 @@
#define OPDL_NAME_SIZE 64
-#define OPDL_EVENT_MASK (0x000FULL)
+#define OPDL_EVENT_MASK (0x000FULL)
+#define OPDL_FLOWID_MASK (0xF)
+#define OPDL_OPA_MASK(0xFF)
+#define OPDL_OPA_OFFSET (0x38)
int opdl_logtype_driver;
@@ -86,7 +89,6 @@ struct opdl_stage {
*/
uint32_t available_seq;
uint32_t head; /* Current head for single-thread operation */
- uint32_t shadow_head; /* Shadow head for single-thread operation */
uint32_t nb_instance; /* Number of instances */
uint32_t instance_id; /* ID of this stage instance */
uint16_t num_claimed; /* Number of slots claimed */
@@ -102,6 +104,9 @@ struct opdl_stage {
/* For managing disclaims in multi-threaded processing stages */
struct claim_manager pending_disclaims[RTE_MAX_LCORE]
__rte_cache_aligned;
+ uint32_t shadow_head; /* Shadow head for single-thread operation */
+ uint32_t queue_id; /* ID of Queue which is assigned to this stage */
+ uint32_t pos; /* Atomic scan position */
} __rte_cache_aligned;
/* Context for opdl_ring */
@@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void
*entries,
uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
{
uint32_t i = 0, j = 0, offset;
+ uint32_t opa_id = 0;
+ uint32_t flow_id = 0;
+ uint64_t event= 0;
void *get_slots;
struct rte_event *ev;
RTE_SET_USED(seq);
@@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void
*entries,
for (j = 0; j < num_entries; j++) {
ev = (struct rte_event *)get_slot(t, s->head+j);
- if ((ev->flow_id%s->nb_instance) == s->instance_id) {
+
+ event = __atomic_load_n(&(ev->event),
+ __ATOMIC_ACQUIRE);
+
+ opa_id = OPDL_OPA_MASK&(event>>OPDL_OPA_OFFSET);
+ flow_id = OPDL_FLOWID_MASK&event;
+
+ if (opa_id >= s->queue_id)
+ continue;
+
+ if ((flow_id%s->nb_instance) == s->instance_id) {
memcpy(entries_offset, ev, t->slot_size);
entries_offset += t->slot_size;
i++;
@@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void
*entries,
s->head += num_entries;
s->num_claimed = num_entries;
s->num_event = i;
+ s->pos = 0;
/* automatically disclaim entries if number of rte_events is zero */
if (unlikely(i == 0))
@@ -953,14 +972,19 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t
index)
}
bool
-opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
+opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
uint32_t index, bool atomic)
{
- uint32_t i = 0, j = 0, offset;
+ uint32_t i = 0, offset;
struct opdl_ring *t = s->t;
struct rte_event *ev_orig = NULL;
bool ev_updated = false;
- uint64_t ev_temp = 0;
+ uint64_t ev_temp= 0;
+ uint64_t ev_update = 0;
+
+ uint32_t opa_id = 0;
+ uint32_t flow_id = 0;
+ uint64_t event= 0;