Thanks for this patch. Looks serious, and I wonder if it explains https://github.com/cloudius-systems/osv/issues/755
Some comments and questions below. On Fri, Sep 9, 2016 at 8:12 PM, Timmons C. Player < timmons.pla...@spirent.com> wrote: > Under high levels of memory page churn, the array of pages in the > l1 page buffer pool can overflow and overwrite the data in adjacent > memory pages. This change replaces the array with a fixed sized ring. > > It also removes the looping behavior found in the refill and unfill > functions. Those functions are called by multiple threads and now > only do the minimum amount of work necessary to allow the callers to > continue. Previously, the looping behavior in the refill thread caused > the array overflow due to a race on the array size variable. > > The pool fill thread is now solely responsible for maintaining > the target pool size. > > Signed-off-by: Timmons C. Player <timmons.pla...@spirent.com> > --- > core/mempool.cc | 67 ++++++++++++++++++++++++++++++ > ++------------------------- > 1 file changed, 38 insertions(+), 29 deletions(-) > > diff --git a/core/mempool.cc b/core/mempool.cc > index 50c938f..fad046d 100644 > --- a/core/mempool.cc > +++ b/core/mempool.cc > @@ -1099,9 +1099,12 @@ struct l1 { > } > static void* alloc_page_local(); > static bool free_page_local(void* v); > - void* pop() { return _pages[--nr]; } > - void push(void* page) { _pages[nr++] = page; } > - void* top() { return _pages[nr - 1]; } > + void* pop() { > + void *page = nullptr; > Nitpick (not important compared to the bigger comments below): I think that setting this nullptr is redundant, and confusing (it led me to think there is a reason why it should be set. If you anyway leave this default value, you could do _pages.pop(page); return page; instead of the ?: expression below. Maybe that's what you intended to do? > + return (_pages.pop(page) ? page : nullptr); > + } > + bool push(void *page) { return _pages.push(page); } > + unsigned size() { return _pages.size(); } > void wake_thread() { _fill_thread.wake(); } > static void fill_thread(); > static void refill(); > @@ -1110,11 +1113,12 @@ struct l1 { > static constexpr size_t max = 512; > static constexpr size_t watermark_lo = max * 1 / 4; > static constexpr size_t watermark_hi = max * 3 / 4; > - size_t nr = 0; > > private: > sched::thread _fill_thread; > - void* _pages[max]; > + > + typedef ring_spsc<void *, max> page_ring; + page_ring _pages; > I don't understand why you needed to change that array to a ring_spsc. First about the boundary checking - sure, the push() and pop() implementation look scary - not checking their boundries (we should at least had an assert there...), but the code which calls push() for example is l1::free_page_local, and that would not call push() if already reached the maximum size. Second the FIFO vs LIFO order - I assume that's not why you switched to the ring. So the most dramatic change is that you changed the non-atomic array to a ring_spsc, which allows two different threads to read and write into this buffer concurrently. But I less I'm misunderstanding something, the intention of this code was that the so-called L1 page pool (per-cpu page-to-be-allocated pool) would only be handled by threads running on a single CPU, and these threads use preemption-disable (preempt_lock) as a way to ensure that the different consumers (callers to l1::alloc_page_local()) and different producers (callers to l1::free_page_local()) will never run concurrently. So unless I'm misunderstanding something, changing to ring_spsc is not necessary here. We might have a different bug in - somewhere were we do not hold the preempt_lock long enough while modifying the buffer. Looking forward in the code a bit, I suspect the DROP_LOCK in alloc_page_batch(). But I'll continue reading your patch now... > }; > > struct page_batch { > @@ -1156,7 +1160,7 @@ public: > page_batch* pb; > while (!(pb = try_alloc_page_batch()) && > // Check again since someone else might change pbuf.nr > when we sleep > - (pbuf.nr + page_batch::nr_pages < pbuf.max / 2)) { > + (pbuf.size() + page_batch::nr_pages < pbuf.max / 2)) { > WITH_LOCK(migration_lock) { > DROP_LOCK(preempt_lock) { > refill(); > @@ -1243,14 +1247,19 @@ void l1::fill_thread() > for (;;) { > sched::thread::wait_until([&] { > WITH_LOCK(preempt_lock) { > - return pbuf.nr < pbuf.watermark_lo || pbuf.nr > > pbuf.watermark_hi; > + auto nr = pbuf.size(); > + return nr < pbuf.watermark_lo || nr > > pbuf.watermark_hi; > } > }); > - if (pbuf.nr < pbuf.watermark_lo) { > Note (not very relevant to your patch): The fact that the original code used pbuf.nr without the preempt_lock protection wasn't great, but I think is not a big problem: this thread is pinned to one CPU (so preempt_lock isn't needed to protect against migration), and concurrent increment/decrement shouldn't cause a problem here, I think (not to mention that inc/dec would be atomic on a single cpu). > - refill(); > + if (pbuf.size() < pbuf.watermark_lo) { > + while (pbuf.size() < pbuf.max / 2) { > + refill(); > + } > } > - if (pbuf.nr > pbuf.watermark_hi) { > - unfill(); > + if (pbuf.size() > pbuf.watermark_hi) { > + while (pbuf.size() > pbuf.max / 2) { > + unfill(); > + } > } > } > } > @@ -1259,12 +1268,17 @@ void l1::refill() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - while (pbuf.nr + page_batch::nr_pages < pbuf.max / 2) { > - auto* pb = global_l2.alloc_page_batch(pbuf); > - if (pb) { > + auto* pb = global_l2.alloc_page_batch(pbuf); So alloc_page_batch dropped the preempt_lock momentarily (it did us the courtesy of preventing migration of the thread to another CPU, but I believe this is actually not necessary - we could have re-gotten get_l1() after the alloc_page_batch and not avoid migration). Wasn't the code here the real bug, and your fix here the real fix? I mean, won't this fix, of testing the room on the array again after calling alloc_page_batch(), work correctly without the rest of the changes to move to a ring_spsc? By the way, looking at the (what I thought was unnecessary) migration lock in alloc_page_batch(), I am starting to think that the giving of pbuf to this function is unnecessary: It seems pbuf is only used there for some test looks completely unnecessary to me, now that anyway you do this test, more correctly, in the caller. But I guess we can consider this later. > + if (pb) { > + // Another thread might have filled the ring while we waited for > + // the page batch. Make sure there is enough room to add the > pages > + // we just acquired, otherwise return them. > Note that the original code didn't intend to fill the entire array, but rather only half, but maybe it doesn't hurt using the entire batch if we already got it. So I think what you did is fine. > + if (pbuf.size() + page_batch::nr_pages <= pbuf.max) { > for (auto& page : pb->pages) { > - pbuf.push(page); > + assert(pbuf.push(page)); > I twinge when I see an assert with side-effects (as you know, other implementations like KASSERT for example, compile-out the entire assert on release builds). But this is actually fine with OSv's assert(). Still might be better style to rewrite this as two statements? > } > + } else { > + global_l2.free_page_batch(pb); > } > } > } > @@ -1273,10 +1287,12 @@ void l1::unfill() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - while (pbuf.nr > page_batch::nr_pages + pbuf.max / 2) { > - auto* pb = static_cast<page_batch*>(pbuf.top()); > - for (size_t i = 0 ; i < page_batch::nr_pages; i++) { > - pb->pages[i] = pbuf.pop(); > + // Don't bother unfilling unless we are actually above our target > size. > + if (pbuf.size() > pbuf.max / 2) { > Here you changed the right hand side of the inequality a bit. Why? > + auto* pb = static_cast<page_batch*>(pbuf.pop()); > + pb->pages[0] = pb; > + for (size_t i = 1 ; i < page_batch::nr_pages; i++) { > + assert(pb->pages[i] = pbuf.pop()); > Again assert with side-effects, not very desirable (but works correctly on OSv). } > global_l2.free_page_batch(pb); > } > @@ -1286,12 +1302,9 @@ void* l1::alloc_page_local() > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - if (pbuf.nr < pbuf.watermark_lo) { > + if (pbuf.size() < pbuf.watermark_lo) { > pbuf.wake_thread(); > } > - if (pbuf.nr == 0) { > - return nullptr; > - } > return pbuf.pop(); > } > > @@ -1299,14 +1312,10 @@ bool l1::free_page_local(void* v) > { > SCOPE_LOCK(preempt_lock); > auto& pbuf = get_l1(); > - if (pbuf.nr > pbuf.watermark_hi) { > + if (pbuf.size() > pbuf.watermark_hi) { > pbuf.wake_thread(); > } > - if (pbuf.nr == pbuf.max) { > - return false; > - } > - pbuf.push(v); > - return true; > + return pbuf.push(v); > } > > // Global thread for L2 page pool > -- > 2.7.4 > > -- > You received this message because you are subscribed to the Google Groups > "OSv Development" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to osv-dev+unsubscr...@googlegroups.com. > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "OSv Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to osv-dev+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.