Under high levels of memory page churn, the array of pages in the
l1 page buffer pool can overflow and overwrite the data in adjacent
memory pages.  This change replaces the array with a fixed sized ring.

It also removes the looping behavior found in the refill and unfill
functions.  Those functions are called by multiple threads and now
only do the minimum amount of work necessary to allow the callers to
continue.  Previously, the looping behavior in the refill thread caused
the array overflow due to a race on the array size variable.

The pool fill thread is now solely responsible for maintaining
the target pool size.

Signed-off-by: Timmons C. Player <[email protected]>
---
 core/mempool.cc | 67 ++++++++++++++++++++++++++++++++-------------------------
 1 file changed, 38 insertions(+), 29 deletions(-)

diff --git a/core/mempool.cc b/core/mempool.cc
index 50c938f..fad046d 100644
--- a/core/mempool.cc
+++ b/core/mempool.cc
@@ -1099,9 +1099,12 @@ struct l1 {
     }
     static void* alloc_page_local();
     static bool free_page_local(void* v);
-    void* pop() { return _pages[--nr]; }
-    void push(void* page) { _pages[nr++] = page; }
-    void* top() { return _pages[nr - 1]; }
+    void* pop() {
+        void *page = nullptr;
+        return (_pages.pop(page) ? page : nullptr);
+    }
+    bool push(void *page) { return _pages.push(page); }
+    unsigned size() { return _pages.size(); }
     void wake_thread() { _fill_thread.wake(); }
     static void fill_thread();
     static void refill();
@@ -1110,11 +1113,12 @@ struct l1 {
     static constexpr size_t max = 512;
     static constexpr size_t watermark_lo = max * 1 / 4;
     static constexpr size_t watermark_hi = max * 3 / 4;
-    size_t nr = 0;
 
 private:
     sched::thread _fill_thread;
-    void* _pages[max];
+
+    typedef ring_spsc<void *, max> page_ring;
+    page_ring _pages;
 };
 
 struct page_batch {
@@ -1156,7 +1160,7 @@ public:
         page_batch* pb;
         while (!(pb = try_alloc_page_batch()) &&
                 // Check again since someone else might change pbuf.nr when we 
sleep
-                (pbuf.nr + page_batch::nr_pages < pbuf.max / 2)) {
+                (pbuf.size() + page_batch::nr_pages < pbuf.max / 2)) {
             WITH_LOCK(migration_lock) {
                 DROP_LOCK(preempt_lock) {
                     refill();
@@ -1243,14 +1247,19 @@ void l1::fill_thread()
     for (;;) {
         sched::thread::wait_until([&] {
                 WITH_LOCK(preempt_lock) {
-                    return pbuf.nr < pbuf.watermark_lo || pbuf.nr > 
pbuf.watermark_hi;
+                    auto nr = pbuf.size();
+                    return nr < pbuf.watermark_lo || nr > pbuf.watermark_hi;
                 }
         });
-        if (pbuf.nr < pbuf.watermark_lo) {
-            refill();
+        if (pbuf.size() < pbuf.watermark_lo) {
+            while (pbuf.size() < pbuf.max / 2) {
+                refill();
+            }
         }
-        if (pbuf.nr > pbuf.watermark_hi) {
-            unfill();
+        if (pbuf.size() > pbuf.watermark_hi) {
+            while (pbuf.size() > pbuf.max / 2) {
+                unfill();
+            }
         }
     }
 }
@@ -1259,12 +1268,17 @@ void l1::refill()
 {
     SCOPE_LOCK(preempt_lock);
     auto& pbuf = get_l1();
-    while (pbuf.nr + page_batch::nr_pages < pbuf.max / 2) {
-        auto* pb = global_l2.alloc_page_batch(pbuf);
-        if (pb) {
+    auto* pb = global_l2.alloc_page_batch(pbuf);
+    if (pb) {
+        // Another thread might have filled the ring while we waited for
+        // the page batch.  Make sure there is enough room to add the pages
+        // we just acquired, otherwise return them.
+        if (pbuf.size() + page_batch::nr_pages <= pbuf.max) {
             for (auto& page : pb->pages) {
-                pbuf.push(page);
+                assert(pbuf.push(page));
             }
+        } else {
+            global_l2.free_page_batch(pb);
         }
     }
 }
@@ -1273,10 +1287,12 @@ void l1::unfill()
 {
     SCOPE_LOCK(preempt_lock);
     auto& pbuf = get_l1();
-    while (pbuf.nr > page_batch::nr_pages + pbuf.max / 2) {
-        auto* pb = static_cast<page_batch*>(pbuf.top());
-        for (size_t i = 0 ; i < page_batch::nr_pages; i++) {
-            pb->pages[i] = pbuf.pop();
+    // Don't bother unfilling unless we are actually above our target size.
+    if (pbuf.size() > pbuf.max / 2) {
+        auto* pb = static_cast<page_batch*>(pbuf.pop());
+        pb->pages[0] = pb;
+        for (size_t i = 1 ; i < page_batch::nr_pages; i++) {
+            assert(pb->pages[i] = pbuf.pop());
         }
         global_l2.free_page_batch(pb);
     }
@@ -1286,12 +1302,9 @@ void* l1::alloc_page_local()
 {
     SCOPE_LOCK(preempt_lock);
     auto& pbuf = get_l1();
-    if (pbuf.nr < pbuf.watermark_lo) {
+    if (pbuf.size() < pbuf.watermark_lo) {
         pbuf.wake_thread();
     }
-    if (pbuf.nr == 0) {
-        return nullptr;
-    }
     return pbuf.pop();
 }
 
@@ -1299,14 +1312,10 @@ bool l1::free_page_local(void* v)
 {
     SCOPE_LOCK(preempt_lock);
     auto& pbuf = get_l1();
-    if (pbuf.nr > pbuf.watermark_hi) {
+    if (pbuf.size() > pbuf.watermark_hi) {
         pbuf.wake_thread();
     }
-    if (pbuf.nr == pbuf.max) {
-        return false;
-    }
-    pbuf.push(v);
-    return true;
+    return pbuf.push(v);
 }
 
 // Global thread for L2 page pool
-- 
2.7.4

-- 
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to