On 9 September 2016 at 18:12, Timmons C. Player <[email protected]>
wrote:

> Under high levels of memory page churn, the array of pages in the
> l1 page buffer pool can overflow and overwrite the data in adjacent
> memory pages.  This change replaces the array with a fixed sized ring.
>
> It also removes the looping behavior found in the refill and unfill
> functions.  Those functions are called by multiple threads and now
> only do the minimum amount of work necessary to allow the callers to
> continue.  Previously, the looping behavior in the refill thread caused
> the array overflow due to a race on the array size variable.
>

Could you elaborate more on that race condition? I don't really understand
why it can happen. L1 page pool is per-CPU and all accesses to it are done
with preemption disabled so it appears to be sufficiently protected.


>
> The pool fill thread is now solely responsible for maintaining
> the target pool size.
>
> Signed-off-by: Timmons C. Player <[email protected]>
> ---
>  core/mempool.cc | 67 ++++++++++++++++++++++++++++++
> ++-------------------------
>  1 file changed, 38 insertions(+), 29 deletions(-)
>
> diff --git a/core/mempool.cc b/core/mempool.cc
> index 50c938f..fad046d 100644
> --- a/core/mempool.cc
> +++ b/core/mempool.cc
> @@ -1099,9 +1099,12 @@ struct l1 {
>      }
>      static void* alloc_page_local();
>      static bool free_page_local(void* v);
> -    void* pop() { return _pages[--nr]; }
> -    void push(void* page) { _pages[nr++] = page; }
> -    void* top() { return _pages[nr - 1]; }
> +    void* pop() {
> +        void *page = nullptr;
> +        return (_pages.pop(page) ? page : nullptr);
> +    }
> +    bool push(void *page) { return _pages.push(page); }
> +    unsigned size() { return _pages.size(); }
>      void wake_thread() { _fill_thread.wake(); }
>      static void fill_thread();
>      static void refill();
> @@ -1110,11 +1113,12 @@ struct l1 {
>      static constexpr size_t max = 512;
>      static constexpr size_t watermark_lo = max * 1 / 4;
>      static constexpr size_t watermark_hi = max * 3 / 4;
> -    size_t nr = 0;
>
>  private:
>      sched::thread _fill_thread;
> -    void* _pages[max];
> +
> +    typedef ring_spsc<void *, max> page_ring;
> +    page_ring _pages;
>  };
>
>  struct page_batch {
> @@ -1156,7 +1160,7 @@ public:
>          page_batch* pb;
>          while (!(pb = try_alloc_page_batch()) &&
>                  // Check again since someone else might change pbuf.nr
> when we sleep
> -                (pbuf.nr + page_batch::nr_pages < pbuf.max / 2)) {
> +                (pbuf.size() + page_batch::nr_pages < pbuf.max / 2)) {
>              WITH_LOCK(migration_lock) {
>                  DROP_LOCK(preempt_lock) {
>                      refill();
> @@ -1243,14 +1247,19 @@ void l1::fill_thread()
>      for (;;) {
>          sched::thread::wait_until([&] {
>                  WITH_LOCK(preempt_lock) {
> -                    return pbuf.nr < pbuf.watermark_lo || pbuf.nr >
> pbuf.watermark_hi;
> +                    auto nr = pbuf.size();
> +                    return nr < pbuf.watermark_lo || nr >
> pbuf.watermark_hi;
>                  }
>          });
> -        if (pbuf.nr < pbuf.watermark_lo) {
> -            refill();
> +        if (pbuf.size() < pbuf.watermark_lo) {
> +            while (pbuf.size() < pbuf.max / 2) {
> +                refill();
> +            }
>          }
> -        if (pbuf.nr > pbuf.watermark_hi) {
> -            unfill();
> +        if (pbuf.size() > pbuf.watermark_hi) {
> +            while (pbuf.size() > pbuf.max / 2) {
> +                unfill();
> +            }
>          }
>      }
>  }
> @@ -1259,12 +1268,17 @@ void l1::refill()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    while (pbuf.nr + page_batch::nr_pages < pbuf.max / 2) {
> -        auto* pb = global_l2.alloc_page_batch(pbuf);
> -        if (pb) {
> +    auto* pb = global_l2.alloc_page_batch(pbuf);
> +    if (pb) {
> +        // Another thread might have filled the ring while we waited for
> +        // the page batch.  Make sure there is enough room to add the
> pages
> +        // we just acquired, otherwise return them.
> +        if (pbuf.size() + page_batch::nr_pages <= pbuf.max) {
>              for (auto& page : pb->pages) {
> -                pbuf.push(page);
> +                assert(pbuf.push(page));
>              }
> +        } else {
> +            global_l2.free_page_batch(pb);
>          }
>      }
>  }
> @@ -1273,10 +1287,12 @@ void l1::unfill()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    while (pbuf.nr > page_batch::nr_pages + pbuf.max / 2) {
> -        auto* pb = static_cast<page_batch*>(pbuf.top());
> -        for (size_t i = 0 ; i < page_batch::nr_pages; i++) {
> -            pb->pages[i] = pbuf.pop();
> +    // Don't bother unfilling unless we are actually above our target
> size.
> +    if (pbuf.size() > pbuf.max / 2) {
> +        auto* pb = static_cast<page_batch*>(pbuf.pop());
> +        pb->pages[0] = pb;
> +        for (size_t i = 1 ; i < page_batch::nr_pages; i++) {
> +            assert(pb->pages[i] = pbuf.pop());
>          }
>          global_l2.free_page_batch(pb);
>      }
> @@ -1286,12 +1302,9 @@ void* l1::alloc_page_local()
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    if (pbuf.nr < pbuf.watermark_lo) {
> +    if (pbuf.size() < pbuf.watermark_lo) {
>          pbuf.wake_thread();
>      }
> -    if (pbuf.nr == 0) {
> -        return nullptr;
> -    }
>      return pbuf.pop();
>  }
>
> @@ -1299,14 +1312,10 @@ bool l1::free_page_local(void* v)
>  {
>      SCOPE_LOCK(preempt_lock);
>      auto& pbuf = get_l1();
> -    if (pbuf.nr > pbuf.watermark_hi) {
> +    if (pbuf.size() > pbuf.watermark_hi) {
>          pbuf.wake_thread();
>      }
> -    if (pbuf.nr == pbuf.max) {
> -        return false;
> -    }
> -    pbuf.push(v);
> -    return true;
> +    return pbuf.push(v);
>  }
>
>  // Global thread for L2 page pool
> --
> 2.7.4
>
> --
> You received this message because you are subscribed to the Google Groups
> "OSv Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to