On 9 September 2016 at 18:12, Timmons C. Player <[email protected]> wrote:
> Under high levels of memory page churn, the array of pages in the > l1 page buffer pool can overflow and overwrite the data in adjacent > memory pages. This change replaces the array with a fixed sized ring. > > It also removes the looping behavior found in the refill and unfill > functions. Those functions are called by multiple threads and now > only do the minimum amount of work necessary to allow the callers to > continue. Previously, the looping behavior in the refill thread caused > the array overflow due to a race on the array size variable. > Could you elaborate more on that race condition? I don't really understand why it can happen. L1 page pool is per-CPU and all accesses to it are done with preemption disabled so it appears to be sufficiently protected. > > The pool fill thread is now solely responsible for maintaining > the target pool size. > > Signed-off-by: Timmons C. Player <[email protected]> > --- > core/mempool.cc | 67 ++++++++++++++++++++++++++++++ > ++------------------------- > 1 file changed, 38 insertions(+), 29 deletions(-) > > diff --git a/core/mempool.cc b/core/mempool.cc > index 50c938f..fad046d 100644 > --- a/core/mempool.cc > +++ b/core/mempool.cc > @@ -1099,9 +1099,12 @@ struct l1 { > } > static void* alloc_page_local(); > static bool free_page_local(void* v); > - void* pop() { return _pages[--nr]; } > - void push(void* page) { _pages[nr++] = page; } > - void* top() { return _pages[nr - 1]; } > + void* pop() { > + void *page = nullptr; > + return (_pages.pop(page) ? page : nullptr); > + } > + bool push(void *page) { return _pages.push(page); } > + unsigned size() { return _pages.size(); } > void wake_thread() { _fill_thread.wake(); } > static void fill_thread(); > static void refill(); > @@ -1110,11 +1113,12 @@ struct l1 { > static constexpr size_t max = 512; > static constexpr size_t watermark_lo = max * 1 / 4; > static constexpr size_t watermark_hi = max * 3 / 4; > - size_t nr = 0; > > private: > sched::thread _fill_thread; > - void* _pages[max]; > + > + typedef ring_spsc<void *, max> page_ring; > + page_ring _pages; > }; > > struct page_batch { > @@ -1156,7 +1160,7 @@ public: > page_batch* pb; > while (!(pb = try_alloc_page_batch()) && > // Check again since someone else might change pbuf.nr > when we sleep > - (pbuf.nr + page_batch::nr_pages < pbuf.max / 2)) { > + (pbuf.size() + page_batch::nr_pages < pbuf.max / 2)) { > WITH_LOCK(migration_lock) { > DROP_LOCK(preempt_lock) { > refill(); > @@ -1243,14 +1247,19 @@ void l1::fill_thread() > for (;;) { > sched::thread::wait_until([&] { > WITH_LOCK(preempt_lock) { > - return pbuf.nr < pbuf.watermark_lo || pbuf.nr > > pbuf.watermark_hi; > + auto nr = pbuf.size(); > + return nr < pbuf.watermark_lo || nr > > pbuf.watermark_hi; > } > }); > - if (pbuf.nr < pbuf.watermark_lo) { > - refill(); > + if (pbuf.size() < pbuf.watermark_lo) { > + while (pbuf.size() < pbuf.max / 2) { > + refill(); > + } > } > - if (pbuf.nr > pbuf.watermark_hi) { > - unfill(); > + if (pbuf.size() > pbuf.watermark_hi) { > + while (pbuf.size() > pbuf.max / 2) { > + unfill(); > + } > } > } > } > @@ -1259,12 +1268,17 @@ void l1::refill() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - while (pbuf.nr + page_batch::nr_pages < pbuf.max / 2) { > - auto* pb = global_l2.alloc_page_batch(pbuf); > - if (pb) { > + auto* pb = global_l2.alloc_page_batch(pbuf); > + if (pb) { > + // Another thread might have filled the ring while we waited for > + // the page batch. Make sure there is enough room to add the > pages > + // we just acquired, otherwise return them. > + if (pbuf.size() + page_batch::nr_pages <= pbuf.max) { > for (auto& page : pb->pages) { > - pbuf.push(page); > + assert(pbuf.push(page)); > } > + } else { > + global_l2.free_page_batch(pb); > } > } > } > @@ -1273,10 +1287,12 @@ void l1::unfill() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - while (pbuf.nr > page_batch::nr_pages + pbuf.max / 2) { > - auto* pb = static_cast<page_batch*>(pbuf.top()); > - for (size_t i = 0 ; i < page_batch::nr_pages; i++) { > - pb->pages[i] = pbuf.pop(); > + // Don't bother unfilling unless we are actually above our target > size. > + if (pbuf.size() > pbuf.max / 2) { > + auto* pb = static_cast<page_batch*>(pbuf.pop()); > + pb->pages[0] = pb; > + for (size_t i = 1 ; i < page_batch::nr_pages; i++) { > + assert(pb->pages[i] = pbuf.pop()); > } > global_l2.free_page_batch(pb); > } > @@ -1286,12 +1302,9 @@ void* l1::alloc_page_local() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - if (pbuf.nr < pbuf.watermark_lo) { > + if (pbuf.size() < pbuf.watermark_lo) { > pbuf.wake_thread(); > } > - if (pbuf.nr == 0) { > - return nullptr; > - } > return pbuf.pop(); > } > > @@ -1299,14 +1312,10 @@ bool l1::free_page_local(void* v) > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - if (pbuf.nr > pbuf.watermark_hi) { > + if (pbuf.size() > pbuf.watermark_hi) { > pbuf.wake_thread(); > } > - if (pbuf.nr == pbuf.max) { > - return false; > - } > - pbuf.push(v); > - return true; > + return pbuf.push(v); > } > > // Global thread for L2 page pool > -- > 2.7.4 > > -- > You received this message because you are subscribed to the Google Groups > "OSv Development" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "OSv Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
