Thanks for this patch. Looks serious, and I wonder if it explains
https://github.com/cloudius-systems/osv/issues/755

Some comments and questions below.

On Fri, Sep 9, 2016 at 8:12 PM, Timmons C. Player <
timmons.pla...@spirent.com> wrote:

> Under high levels of memory page churn, the array of pages in the
> l1 page buffer pool can overflow and overwrite the data in adjacent
> memory pages.  This change replaces the array with a fixed sized ring.
>
> It also removes the looping behavior found in the refill and unfill
> functions.  Those functions are called by multiple threads and now
> only do the minimum amount of work necessary to allow the callers to
> continue.  Previously, the looping behavior in the refill thread caused
> the array overflow due to a race on the array size variable.
>
> The pool fill thread is now solely responsible for maintaining
> the target pool size.
>
> Signed-off-by: Timmons C. Player <timmons.pla...@spirent.com>
> ---
>  core/mempool.cc | 67 ++++++++++++++++++++++++++++++
> ++-------------------------
>  1 file changed, 38 insertions(+), 29 deletions(-)
>
> diff --git a/core/mempool.cc b/core/mempool.cc
> index 50c938f..fad046d 100644
> --- a/core/mempool.cc
> +++ b/core/mempool.cc
> @@ -1099,9 +1099,12 @@ struct l1 {
>      }
>      static void* alloc_page_local();
>      static bool free_page_local(void* v);
> -    void* pop() { return _pages[--nr]; }
> -    void push(void* page) { _pages[nr++] = page; }
> -    void* top() { return _pages[nr - 1]; }
> +    void* pop() {
> +        void *page = nullptr;
>

Nitpick (not important compared to the bigger comments below):

I think that setting this nullptr is redundant, and confusing (it led me to
think there is a reason why it should be set.
If you anyway leave this default value, you could do
    _pages.pop(page);
    return page;
instead of the ?: expression below. Maybe that's what you intended to do?


> +        return (_pages.pop(page) ? page : nullptr);
> +    }
> +    bool push(void *page) { return _pages.push(page); }
> +    unsigned size() { return _pages.size(); }
>      void wake_thread() { _fill_thread.wake(); }
>      static void fill_thread();
>      static void refill();
> @@ -1110,11 +1113,12 @@ struct l1 {
>      static constexpr size_t max = 512;
>      static constexpr size_t watermark_lo = max * 1 / 4;
>      static constexpr size_t watermark_hi = max * 3 / 4;
> -    size_t nr = 0;
>
>  private:
>      sched::thread _fill_thread;
> -    void* _pages[max];
> +
> +    typedef ring_spsc<void *, max> page_ring;

+    page_ring _pages;
>

I don't understand why you needed to change that array to a ring_spsc.

First about the boundary checking - sure, the push() and pop()
implementation look scary - not checking their boundries (we should at
least had an assert there...), but the code which calls push() for example
is l1::free_page_local, and that would not call push() if already reached
the maximum size.
Second the FIFO vs LIFO order - I assume that's not why you switched to the
ring.

So the most dramatic change is that you changed the non-atomic array to a
ring_spsc, which allows two different threads to read and write into this
buffer concurrently. But I less I'm misunderstanding something, the
intention of this code was that the so-called L1 page pool (per-cpu
page-to-be-allocated pool) would only be handled by threads running on a
single CPU, and these threads use preemption-disable (preempt_lock) as a
way to ensure that the different consumers (callers to
l1::alloc_page_local()) and different producers (callers to
l1::free_page_local()) will never run concurrently.

So unless I'm misunderstanding something, changing to ring_spsc is not
necessary here. We might have a different bug in - somewhere were we do not
hold the preempt_lock long enough while modifying the buffer.

Looking forward in the code a bit, I suspect the DROP_LOCK in
alloc_page_batch(). But I'll continue reading your patch now...


>  };
>
>  struct page_batch {
> @@ -1156,7 +1160,7 @@ public:
>          page_batch* pb;
>          while (!(pb = try_alloc_page_batch()) &&
>                  // Check again since someone else might change pbuf.nr
> when we sleep
> -                (pbuf.nr + page_batch::nr_pages < pbuf.max / 2)) {
> +                (pbuf.size() + page_batch::nr_pages < pbuf.max / 2)) {
>              WITH_LOCK(migration_lock) {
>                  DROP_LOCK(preempt_lock) {
>                      refill();
> @@ -1243,14 +1247,19 @@ void l1::fill_thread()
>      for (;;) {
>          sched::thread::wait_until([&] {
>                  WITH_LOCK(preempt_lock) {
> -                    return pbuf.nr < pbuf.watermark_lo || pbuf.nr >
> pbuf.watermark_hi;
> +                    auto nr = pbuf.size();
> +                    return nr < pbuf.watermark_lo || nr >
> pbuf.watermark_hi;
>                  }
>          });
> -        if (pbuf.nr < pbuf.watermark_lo) {
>

Note (not very relevant to your patch): The fact that the original code
used pbuf.nr without the preempt_lock protection wasn't great, but I think
is not a big problem: this thread is pinned to one CPU (so preempt_lock
isn't needed to protect against migration), and concurrent
increment/decrement shouldn't cause a problem here, I think (not to mention
that inc/dec would be atomic on a single cpu).



> -            refill();
> +        if (pbuf.size() < pbuf.watermark_lo) {
> +            while (pbuf.size() < pbuf.max / 2) {
> +                refill();
> +            }
>
         }
> -        if (pbuf.nr > pbuf.watermark_hi) {
> -            unfill();
> +        if (pbuf.size() > pbuf.watermark_hi) {
> +            while (pbuf.size() > pbuf.max / 2) {
> +                unfill();
> +            }
>          }
>      }
>  }
> @@ -1259,12 +1268,17 @@ void l1::refill()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    while (pbuf.nr + page_batch::nr_pages < pbuf.max / 2) {
> -        auto* pb = global_l2.alloc_page_batch(pbuf);
> -        if (pb) {
> +    auto* pb = global_l2.alloc_page_batch(pbuf);


So alloc_page_batch dropped the preempt_lock momentarily (it did us the
courtesy of preventing migration of the thread to another CPU, but I
believe this is actually not necessary - we could have re-gotten get_l1()
after the alloc_page_batch and not avoid migration).

Wasn't the code here the real bug, and your fix here the real fix? I mean,
won't this fix, of testing the room on the array again after calling
alloc_page_batch(), work correctly without the rest of the changes to move
to a ring_spsc?

By the way, looking at the (what I thought was unnecessary) migration lock
in alloc_page_batch(), I am starting to think that the giving of pbuf to
this function is unnecessary: It seems pbuf is only used there for some
test looks completely unnecessary to me, now that anyway you do this test,
more correctly, in the caller.  But I guess we can consider this later.


> +    if (pb) {
> +        // Another thread might have filled the ring while we waited for
> +        // the page batch.  Make sure there is enough room to add the
> pages
> +        // we just acquired, otherwise return them.
>

Note that the original code didn't intend to fill the entire array, but
rather only half, but maybe it doesn't hurt using the entire batch if we
already got it. So I think what you did is fine.


> +        if (pbuf.size() + page_batch::nr_pages <= pbuf.max) {
>              for (auto& page : pb->pages) {
> -                pbuf.push(page);
> +                assert(pbuf.push(page));
>

I twinge when I see an assert with side-effects (as you know, other
implementations like KASSERT for example, compile-out the entire assert on
release builds). But this is actually fine with OSv's assert(). Still might
be better style to rewrite this as two statements?


>              }
> +        } else {
> +            global_l2.free_page_batch(pb);
>          }
>      }
>  }
> @@ -1273,10 +1287,12 @@ void l1::unfill()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    while (pbuf.nr > page_batch::nr_pages + pbuf.max / 2) {
> -        auto* pb = static_cast<page_batch*>(pbuf.top());
> -        for (size_t i = 0 ; i < page_batch::nr_pages; i++) {
> -            pb->pages[i] = pbuf.pop();
> +    // Don't bother unfilling unless we are actually above our target
> size.
> +    if (pbuf.size() > pbuf.max / 2) {
>

Here you changed the right hand side of the inequality a bit. Why?


> +        auto* pb = static_cast<page_batch*>(pbuf.pop());
> +        pb->pages[0] = pb;
> +        for (size_t i = 1 ; i < page_batch::nr_pages; i++) {
> +            assert(pb->pages[i] = pbuf.pop());
>

Again assert with side-effects, not very desirable (but works correctly on
OSv).

         }
>          global_l2.free_page_batch(pb);
>      }
> @@ -1286,12 +1302,9 @@ void* l1::alloc_page_local()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    if (pbuf.nr < pbuf.watermark_lo) {
> +    if (pbuf.size() < pbuf.watermark_lo) {
>          pbuf.wake_thread();
>      }
> -    if (pbuf.nr == 0) {
> -        return nullptr;
> -    }
>      return pbuf.pop();
>  }
>
> @@ -1299,14 +1312,10 @@ bool l1::free_page_local(void* v)
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    if (pbuf.nr > pbuf.watermark_hi) {
> +    if (pbuf.size() > pbuf.watermark_hi) {
>          pbuf.wake_thread();
>      }
> -    if (pbuf.nr == pbuf.max) {
> -        return false;
> -    }
> -    pbuf.push(v);
> -    return true;
> +    return pbuf.push(v);
>  }
>
>  // Global thread for L2 page pool
> --
> 2.7.4
>
> --
> You received this message because you are subscribed to the Google Groups
> "OSv Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to osv-dev+unsubscr...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to osv-dev+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to