Bill Fischofer(Bill-Fischofer-Linaro) replied on github web page: platform/linux-generic/include/odp_ring_st_internal.h line 78 @@ -0,0 +1,118 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_ST_INTERNAL_H_ +#define ODP_RING_ST_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/hints.h> +#include <odp_align_internal.h> + +/* Basic ring for single thread usage. Operations must be synchronized by using + * locks (or other means), when multiple threads use the same ring. */ +typedef struct { + uint32_t head; + uint32_t tail; + uint32_t mask; + uint32_t *data; + +} ring_st_t; + +/* Initialize ring. Ring size must be a power of two. */ +static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t size) +{ + ring->head = 0; + ring->tail = 0; + ring->mask = size - 1; + ring->data = data; +} + +/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ +static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[], + uint32_t max_num) +{ + uint32_t head, tail, mask, idx; + uint32_t num, i; + + head = ring->head; + tail = ring->tail; + mask = ring->mask; + num = tail - head; + + /* Empty */ + if (num == 0) + return 0; + + if (num > max_num) + num = max_num; + + idx = head & mask; + + for (i = 0; i < num; i++) { + data[i] = ring->data[idx]; + idx = (idx + 1) & mask; + } + + ring->head = head + num; + + return num; +} + +/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ +static inline uint32_t ring_st_enq_multi(ring_st_t *ring, const uint32_t data[], + uint32_t num_data) +{ + uint32_t head, tail, mask, size, idx; + uint32_t num, i; + + head = ring->head; + tail = ring->tail; + mask = ring->mask; + size = mask + 1; + num = size - (tail - head);
Comment: Since you're allowing `head` and `tail` to run over the entire 32-bit range, you're correct that you can completely fill the ring. I did miss that point. However, as shown above you still need this to be: ``` num = size - abs(tail - head); ``` To avoid problems at the 32-bit wrap boundary. > Bill Fischofer(Bill-Fischofer-Linaro) wrote: > You're computing in 32 bits not 8 bits, and your ring size is less than 2^32 > elements. Consider the following test program: > ``` > #include <stdlib.h> > #include <stdio.h> > #include <stdint.h> > #include <inttypes.h> > > void main() { > uint32_t head[4] = {0, 1, 2, 3}; > uint32_t tail[4] = {0, 0xffffffff, 0xfffffffe, 0xfffffffd}; > uint32_t mask = 4095; > uint32_t result; > int i; > > for (i = 0; i < 4; i++) { > printf("head = %" PRIu32 " tail = %" PRIu32 ":\n", > head[i], tail[i]); > > result = tail[i] - head[i]; > printf("tail - head = %" PRIu32 "\n", result); > > result = (tail[i] - head[i]) & mask; > printf("(tail - head) & mask = %" PRIu32 "\n", result); > > result = abs(tail[i] - head[i]); > printf("abs(tail - head) = %" PRIu32 "\n\n", result); > } > } > ``` > in theory `tail - head` should be the number of elements in the ring, in this > case 0, 2, 4, and 6. But running this test program gives the following output: > ``` > head = 0 tail = 0: > tail - head = 0 > (tail - head) & mask = 0 > abs(tail - head) = 0 > > head = 1 tail = 4294967295: > tail - head = 4294967294 > (tail - head) & mask = 4094 > abs(tail - head) = 2 > > head = 2 tail = 4294967294: > tail - head = 4294967292 > (tail - head) & mask = 4092 > abs(tail - head) = 4 > > head = 3 tail = 4294967293: > tail - head = 4294967290 > (tail - head) & mask = 4090 > abs(tail - head) = 6 > ``` > Since you're allowing head to run free over the 32-bit range of the variable, > when the 32-bits rolls over you'll get a large positive number, not the small > one you need to stay within the ring bounds. The alternative is to mask > `head` and `tail` as you increment them, but then you run into the effective > range issue. >> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >> OK >>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>> OK, the `ODP_ASSERT()` would still be useful for debugging. >>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>> That functionality is not obvious from the name. It either implies that >>>> one of the input arguments is written (not true here) or the reader might >>>> assume that it is an expression without side-effect and should be deleted >>>> (what I originally thought when reading it). You should pick a routine >>>> name that makes it clear it's actually doing something real, in this case >>>> performing prefetch processing. >>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>> Elsewhere you write `if (ring_st_is_empty(...))`, not `if >>>>> (ring_st_is_empty(...) == 1)` so this is inconsistent. >>>>>> Petri Savolainen(psavol) wrote: >>>>>> Didn't try larger than 32. 32 is already quite large from QoS point of >>>>>> view. >>>>>> >>>>>> I'm planning to use config file for run time tunning, so this hard >>>>>> coding may change in that phase. >>>>>>> Petri Savolainen(psavol) wrote: >>>>>>> One entry is not lost. >>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>> One entry is not lost. User provided size if not (currently) used. >>>>>>>> Queue size is always 4k. >>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>> One entry is not lost. >>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>> One entry is not lost. >>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>> OK, added checks in v2. >>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>> OK. Compiler probably did that already, but changed in v2. >>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>> Tail and head indexes are (masked from) uint32_t and do not wrap >>>>>>>>>>>>> around when the ring is full. I think you assume that the store >>>>>>>>>>>>> index is 0...size-1, while it's full uint32_t which is then >>>>>>>>>>>>> masked to get the actual index. >>>>>>>>>>>>> >>>>>>>>>>>>> For example: >>>>>>>>>>>>> size = 100; >>>>>>>>>>>>> >>>>>>>>>>>>> Empty: >>>>>>>>>>>>> head = 100 >>>>>>>>>>>>> tail = 100 >>>>>>>>>>>>> num = 100 - 100 = 0 >>>>>>>>>>>>> >>>>>>>>>>>>> Full: >>>>>>>>>>>>> head = 100 >>>>>>>>>>>>> tail = 200 >>>>>>>>>>>>> num = 200 - 100 = 100 >>>>>>>>>>>>> >>>>>>>>>>>>> Wrap uint32_t + full: >>>>>>>>>>>>> head = 0xFFFFFF9C >>>>>>>>>>>>> tail = 0 >>>>>>>>>>>>> num = 0 - 0xFFFFFF9C = 0x64 = 100 >>>>>>>>>>>>> >>>>>>>>>>>>> So, no abs() needed. Ring size can be 4096, instead of 4095. >>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>> It's already documented 5 lines above: >>>>>>>>>>>>>> >>>>>>>>>>>>>> /* Initialize ring. Ring size must be a power of two. */ >>>>>>>>>>>>>> static inline void ring_st_init(ring_st_t *ring, uint32_t *data, >>>>>>>>>>>>>> uint32_t size) >>>>>>>>>>>>>> { >>>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>>> This function converts 32 bit buffer indexes to buffer header >>>>>>>>>>>>>>> pointers. The counter operation is buffer_index_from_buf(). The >>>>>>>>>>>>>>> prefetch is a side effect of the function, which may be >>>>>>>>>>>>>>> changed/moved any time if it's found out that there's a place >>>>>>>>>>>>>>> for prefetching. I actually plan to test if number of >>>>>>>>>>>>>>> prefetches should be limited as e.g. 32 consecutive prefetches >>>>>>>>>>>>>>> may be too much for some CPU architectures. >>>>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>>>> I prefer style where '== 0' is used instead of '!'. >>>>>>>>>>>>>>>> Especially, when the if clause is as complex as this and >>>>>>>>>>>>>>>> there's danger for reader to miss the '!' sign. >>>>>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>>>>> It's there to ensure that all bits are zero also when someone >>>>>>>>>>>>>>>>> would modify the bitfield from two to three fields later on. >>>>>>>>>>>>>>>>> Similarly to memset() zero is used for struct inits. >>>>>>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>>>>>> There's no need for abs(). Since it's all uint32_t >>>>>>>>>>>>>>>>>> variables, wrap a round is handled already. >>>>>>>>>>>>>>>>>> An example in 8bits: >>>>>>>>>>>>>>>>>> 0xff - 0xfd = 0x02 >>>>>>>>>>>>>>>>>> 0x00 - 0xfe = 0x02 >>>>>>>>>>>>>>>>>> 0x01 - 0xff = 0x02 >>>>>>>>>>>>>>>>>> 0x02 - 0x00 = 0x02 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This passes both gcc and clang, and is used already in the >>>>>>>>>>>>>>>>>> other ring implementation see ring_deq_multi(). >>>>>>>>>>>>>>>>>>> Petri Savolainen(psavol) wrote: >>>>>>>>>>>>>>>>>>> I prefer style with blank line in the end of a typedef, >>>>>>>>>>>>>>>>>>> since it's easier to spot the type name (as it's not mixed >>>>>>>>>>>>>>>>>>> into struct field names). Checkpatch passes so this style >>>>>>>>>>>>>>>>>>> should be OK. >>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>> Does this mean that sizes larger than 32 have no added >>>>>>>>>>>>>>>>>>>> performance benefit? >>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>> Must use `CONFIG_QUEUE_SIZE - 1` here, as noted earlier, >>>>>>>>>>>>>>>>>>>>> if we're not going to use the user-supplied queue size. >>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>> Given its name, this looks like an extraneous statement >>>>>>>>>>>>>>>>>>>>>> that should be deleted. Renaming this to something like >>>>>>>>>>>>>>>>>>>>>> `prefetch_dequeued_bufs()` would make the intent clearer >>>>>>>>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>> `if (!ring_st_is_empty(&queue->s.ring_st))` seems more >>>>>>>>>>>>>>>>>>>>>>> natural here. >>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>> Change to `if (param->size >= CONFIG_QUEUE_SIZE)` to >>>>>>>>>>>>>>>>>>>>>>>> handle the effective queue capacity. The user-supplied >>>>>>>>>>>>>>>>>>>>>>>> `size` should then be set to `ROUNDUP_POWER2_U32(size) >>>>>>>>>>>>>>>>>>>>>>>> - 1` for the masking to work properly. >>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>> Same comment here as for plain queues. >>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> As noted earlier, due to "losing" one entry to >>>>>>>>>>>>>>>>>>>>>>>>>> distinguish queue empty/full, this should be >>>>>>>>>>>>>>>>>>>>>>>>>> returned as `CONFIG_QUEUE_SIZE - 1`, and we also >>>>>>>>>>>>>>>>>>>>>>>>>> need to ensure that `CONFIG_QUEUE_SIZE` is itself a >>>>>>>>>>>>>>>>>>>>>>>>>> power of 2. >>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> Since you're initializing `index.pool` and >>>>>>>>>>>>>>>>>>>>>>>>>>> `index.buffer` there's no need to set `index.u32` >>>>>>>>>>>>>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> We originally had this index partitioning based on >>>>>>>>>>>>>>>>>>>>>>>>>>>> `ODP_CONFIG_POOLS`. Do we want to return to that >>>>>>>>>>>>>>>>>>>>>>>>>>>> here? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> If not, we at least need an `ODP_STATIC_ASSERT()` >>>>>>>>>>>>>>>>>>>>>>>>>>>> to ensure that `ODP_CONFIG_POOLS < 256` or else >>>>>>>>>>>>>>>>>>>>>>>>>>>> bad things will happen here. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> This routine can be optimized to: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>>>>>>>>>>>>>>> return ring->head == ring->tail; >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Your invariant is the queue is empty when `head >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> == tail` therefore the queue is full when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `abs(tail - head) == mask`, so the correct >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> calculation here is: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `num = mask - abs(tail - head);` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The effect is that a queue can only hold `size - >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1` elements, otherwise you cannot distinguish >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between a full and an empty queue without >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another bit of metadata, which is a cost you're >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trying to avoid. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is somewhat problematic if the caller is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trying to be "optimal" by specifying a power of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> two in the `size` parameter of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `odp_queue_param_t` passed to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `odp_queue_create()`. For this reason we may >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wish to return a `max_size` of a power of 2 - 1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in `odp_queue_capability()` as part of this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> patch series. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This only works if `size` is a power of 2. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Should be documented as such, since this is an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal routine. In this case an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `ODP_ASSERT(size == ROUNDUP_POWER2_U32(size))` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for this requirement would be a useful >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> debugging aid. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be `num = abs(tail - head);` to deal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with wrap arounds, otherwise may be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misinterpreted as overly large since it's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `uint32_t`. Note that GCC and clang recognize >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `abs()` and treat it as a builtin, so there's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> no actual `stdlib.h` call here. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Extra blank line should be removed (nit). https://github.com/Linaro/odp/pull/492#discussion_r170150669 updated_at 2018-02-23 02:21:54