On Mon, 2025-08-04 at 10:20 +0800, Xu Kuohai wrote:

[...]

> @@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, 
> size_t n)
>       return cnt;
>  }
>  
> +static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n)
> +{
> +
> +     int err;
> +     uint32_t *len_ptr, len;
> +     /* 64-bit to avoid overflow in case of extreme application behavior */
> +     int64_t cnt = 0;
> +     size_t size, offset;
> +     unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
> +     bool got_new_data;
> +     void *sample;
> +     bool copied;
> +
> +     size = r->mask + 1;
> +
> +     cons_pos = smp_load_acquire(r->consumer_pos);
> +     do {
> +             got_new_data = false;
> +
> +             /* grab a copy of data */
> +             prod_pos = smp_load_acquire(r->producer_pos);
> +             do {
> +                     over_pos = READ_ONCE(*r->overwrite_pos);
> +                     /* prod_pos may be outdated now */
> +                     if (over_pos < prod_pos) {
> +                             tmp_pos = max(cons_pos, over_pos);
> +                             /* smp_load_acquire(r->producer_pos) before
> +                              * READ_ONCE(*r->overwrite_pos) ensures that
> +                              * over_pos + r->mask < prod_pos never occurs,
> +                              * so size is never larger than r->mask
> +                              */
> +                             size = prod_pos - tmp_pos;
> +                             if (!size)
> +                                     goto done;
> +                             memcpy(r->read_buffer,
> +                                    r->data + (tmp_pos & r->mask), size);
> +                             copied = true;
> +                     } else {
> +                             copied = false;
> +                     }
> +                     prod_pos = smp_load_acquire(r->producer_pos);
> +             /* retry if data is overwritten by producer */
> +             } while (!copied || prod_pos - tmp_pos > r->mask);

Could you please elaborate a bit, why this condition is sufficient to
guarantee that r->overwrite_pos had not changed while memcpy() was
executing?

> +
> +             cons_pos = tmp_pos;
> +
> +             for (offset = 0; offset < size; offset += roundup_len(len)) {
> +                     len_ptr = r->read_buffer + (offset & r->mask);
> +                     len = *len_ptr;
> +
> +                     if (len & BPF_RINGBUF_BUSY_BIT)
> +                             goto done;
> +
> +                     got_new_data = true;
> +                     cons_pos += roundup_len(len);
> +
> +                     if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
> +                             sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
> +                             err = r->sample_cb(r->ctx, sample, len);
> +                             if (err < 0) {
> +                                     /* update consumer pos and bail out */
> +                                     smp_store_release(r->consumer_pos,
> +                                                       cons_pos);
> +                                     return err;
> +                             }
> +                             cnt++;
> +                     }
> +
> +                     if (cnt >= n)
> +                             goto done;
> +             }
> +     } while (got_new_data);
> +
> +done:
> +     smp_store_release(r->consumer_pos, cons_pos);
> +     return cnt;
> +}

[...]

Reply via email to