Repository: impala
Updated Branches:
  refs/heads/2.x cefc212c5 -> 7097ee88d


IMPALA-4835: prerequisite buffer pool changes

The scanner/buffer pool changes will have different scanner
threads sharing the same buffer pool client. This requires that the
AllocateBuffer() API is safe to call concurrently from different
threads, which was true previously but not documented or tested.

This updates the comments and adds a couple of tests.

Change-Id: I8f2196722df59f2d367787c0550058022e296e24
Reviewed-on: http://gerrit.cloudera.org:8080/9097
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e0b3a48b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e0b3a48b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e0b3a48b

Branch: refs/heads/2.x
Commit: e0b3a48b42b0319c48cfdeafe6a77765406ee5bf
Parents: d24f23b
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Jan 22 14:30:58 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:14 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/buffer-pool-test.cc | 136 +++++++++++++++++++++
 be/src/runtime/bufferpool/buffer-pool.h       |  15 ++-
 2 files changed, 145 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e0b3a48b/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc 
b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 720dc13..0138a08 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -40,7 +40,9 @@
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/blocking-queue.h"
 #include "util/filesystem-util.h"
+#include "util/spinlock.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
@@ -751,6 +753,78 @@ TEST_F(BufferPoolTest, BufferTransfer) {
   global_reservations_.Close();
 }
 
+TEST_F(BufferPoolTest, BufferTransferConcurrent) {
+  // Transfer buffers between threads in a circular fashion. Each client needs 
to have
+  // enough reservation for two buffers, since it may receive a buffer before 
handing
+  // off the next one.
+  const int NUM_CLIENTS = 5;
+  const int64_t TOTAL_MEM = NUM_CLIENTS * TEST_BUFFER_LEN * 2;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
+
+  BufferPool::ClientHandle clients[NUM_CLIENTS];
+  BufferPool::BufferHandle handles[NUM_CLIENTS];
+  SpinLock locks[NUM_CLIENTS]; // Each lock protects the corresponding 
BufferHandle.
+  for (int i = 0; i < NUM_CLIENTS; ++i) {
+    ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, 
NULL,
+        TOTAL_MEM, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservationToFit(2 * TEST_BUFFER_LEN));
+  }
+
+  thread_group workers;
+
+  for (int thread_idx = 0; thread_idx < NUM_CLIENTS; ++thread_idx) {
+    workers.add_thread(new thread([&pool, &clients, &handles, &locks, 
thread_idx] {
+      // Transfer buffers around between the clients repeatedly in a circle.
+      BufferHandle handle;
+      {
+        lock_guard<SpinLock> l(locks[thread_idx]);
+        LOG(INFO) << "Allocate from " << (void*)&clients[thread_idx];
+        ASSERT_OK(pool.AllocateBuffer(
+              &clients[thread_idx], TEST_BUFFER_LEN, &handle));
+      }
+      for (int iter = 0; iter < 100; ++iter) {
+        int next_thread_idx = (thread_idx + 1) % NUM_CLIENTS;
+        // Transfer our buffer to the next thread.
+        {
+          unique_lock<SpinLock> l(locks[next_thread_idx]);
+          // Spin until we can add the handle.
+          while (true) {
+            if (!handles[next_thread_idx].is_open()) break;
+            l.unlock();
+            sched_yield();
+            l.lock();
+          }
+          ASSERT_TRUE(handle.is_open());
+          ASSERT_OK(pool.TransferBuffer(&clients[thread_idx], &handle,
+              &clients[next_thread_idx], &handles[next_thread_idx]));
+          // Check that the transfer left things in a consistent state.
+          ASSERT_TRUE(handles[next_thread_idx].is_open());
+          ASSERT_FALSE(handle.is_open());
+          ASSERT_GE(clients[next_thread_idx].GetUsedReservation(), 
TEST_BUFFER_LEN);
+        }
+        // Get a new buffer from the previous thread.
+        {
+          unique_lock<SpinLock> l(locks[thread_idx]);
+          // Spin until we receive a handle from the previous thread.
+          while (true) {
+            if (handles[thread_idx].is_open()) break;
+            l.unlock();
+            sched_yield();
+            l.lock();
+          }
+          handle = move(handles[thread_idx]);
+        }
+      }
+      pool.FreeBuffer(&clients[thread_idx], &handle);
+      }));
+  }
+  workers.join_all();
+  for (BufferPool::ClientHandle& client : clients) 
pool.DeregisterClient(&client);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
 /// Test basic pinning and unpinning.
 TEST_F(BufferPoolTest, Pin) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
@@ -2047,6 +2121,68 @@ TEST_F(BufferPoolTest, DecreaseReservation) {
   pool.DeregisterClient(&client);
   global_reservations_.Close();
 }
+
+// Test concurrent operations using the same client and different buffers.
+TEST_F(BufferPoolTest, ConcurrentBufferOperations) {
+  const int DELETE_THREADS = 2;
+  const int ALLOCATE_THREADS = 2;
+  const int NUM_ALLOCATIONS_PER_THREAD = 128;
+  const int MAX_NUM_BUFFERS = 16;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", nullptr, &global_reservations_, 
nullptr,
+      TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
+
+  thread_group allocate_threads;
+  thread_group delete_threads;
+  AtomicInt64 available_reservation(TOTAL_MEM);
+
+  // Queue of buffers to be deleted, along with the first byte of the data in
+  // the buffer, for validation purposes.
+  BlockingQueue<pair<uint8_t, BufferHandle>> delete_queue(MAX_NUM_BUFFERS);
+
+  // Allocate threads allocate buffers whenever able and enqueue them.
+  for (int i = 0; i < ALLOCATE_THREADS; ++i) {
+    allocate_threads.add_thread(new thread([&] {
+        for (int i = 0; i < NUM_ALLOCATIONS_PER_THREAD; ++i) {
+          // Try to deduct reservation.
+          while (true) {
+            int64_t val = available_reservation.Load();
+            if (val >= TEST_BUFFER_LEN
+                && available_reservation.CompareAndSwap(val, val - 
TEST_BUFFER_LEN)) {
+              break;
+            }
+          }
+          BufferHandle buffer;
+          ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer));
+          uint8_t first_byte = static_cast<uint8_t>(i % 256);
+          buffer.data()[0] = first_byte;
+          delete_queue.BlockingPut(pair<uint8_t, BufferHandle>(first_byte, 
move(buffer)));
+        }
+        }));
+  }
+
+  // Delete threads pull buffers off the queue and free them.
+  for (int i = 0; i < DELETE_THREADS; ++i) {
+    delete_threads.add_thread(new thread([&] {
+          pair<uint8_t, BufferHandle> item;
+          while (delete_queue.BlockingGet(&item)) {
+            ASSERT_EQ(item.first, item.second.data()[0]);
+            pool.FreeBuffer(&client, &item.second);
+            available_reservation.Add(TEST_BUFFER_LEN);
+          }
+        }));
+
+  }
+  allocate_threads.join_all();
+  delete_queue.Shutdown();
+  delete_threads.join_all();
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e0b3a48b/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h 
b/be/src/runtime/bufferpool/buffer-pool.h
index 86be6f9..5b98579 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -227,19 +227,21 @@ class BufferPool : public CacheLineAligned {
   /// pinned multiple times via 'page_handle'. May return an error if 
'page_handle' was
   /// unpinned earlier with no subsequent GetBuffer() call and a read error is
   /// encountered while bringing the page back into memory.
-  Status ExtractBuffer(
-      ClientHandle* client, PageHandle* page_handle, BufferHandle* 
buffer_handle) WARN_UNUSED_RESULT;
+  Status ExtractBuffer(ClientHandle* client, PageHandle* page_handle,
+      BufferHandle* buffer_handle) WARN_UNUSED_RESULT;
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. 
The caller
   /// is responsible for ensuring it has enough unused reservation before 
calling
   /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails 
when
   /// a system error prevents the buffer pool from fulfilling the reservation.
+  /// Safe to call concurrently with any other operations for 'client', except 
for
+  /// operations on the same 'handle'.
   Status AllocateBuffer(
       ClientHandle* client, int64_t len, BufferHandle* handle) 
WARN_UNUSED_RESULT;
 
   /// If 'handle' is open, close 'handle', free the buffer and decrease the 
reservation
-  /// usage from 'client'. Idempotent. Safe to call concurrently with any other
-  /// operations for 'client'.
+  /// usage from 'client'. Idempotent. Safe to call concurrently with other 
operations
+  /// for 'client', except for operations on the same 'handle'.
   void FreeBuffer(ClientHandle* client, BufferHandle* handle);
 
   /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move 
the
@@ -247,7 +249,8 @@ class BufferPool : public CacheLineAligned {
   /// decreases reservation usage in 'src_client'. 'src' must be open and 
'dst' must be
   /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be 
different.
   /// After a successful call, 'src' is closed and 'dst' is open. Safe to call
-  /// concurrently with any other operations for 'src_client'.
+  /// concurrently with any other operations for 'src_client', except for 
operations
+  /// on the same handles.
   Status TransferBuffer(ClientHandle* src_client, BufferHandle* src,
       ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT;
 
@@ -507,7 +510,7 @@ class BufferPool::PageHandle {
   DISALLOW_COPY_AND_ASSIGN(PageHandle);
   friend class BufferPool;
   friend class BufferPoolTest;
-  friend class Page;
+  friend struct Page;
 
   /// Internal helper to open the handle for the given page.
   void Open(Page* page, ClientHandle* client);

Reply via email to