This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7116173 IMPALA-12883: Support updating the charge on an entry in the 
cache
2f7116173 is described below

commit 2f7116173071082ea9cb28512894dc1fa537e9fe
Author: Michael Smith <michael.sm...@cloudera.com>
AuthorDate: Thu Mar 7 16:46:38 2024 -0800

    IMPALA-12883: Support updating the charge on an entry in the cache
    
    The cache implementation currently assumes that a cache
    entry's charge will remain constant over time and does
    not support updating the charge. This works well for
    existing cache users like the data cache and codegen
    cache. However, it doesn't work as well for use cases
    where the final size is not known up front. Being able
    to update the charge also allows for avoiding
    concurrency issues where two different threads are
    trying to insert the same entry.
    
    This adds the ability to update an entry's charge after
    it has been inserted into the cache. This can trigger
    evictions if the size increases. This also adds a way
    to retrieve the maximum charge supported by the cache.
    This allows a cache user to tune its behavior to avoid
    generating entries that would exceed the maximum charge.
    
    Testing:
     - Added tests cases to the caching backend tests
    
    Change-Id: I34f54fb3a91a77821651c25d8d3bc3a2a3945025
    Reviewed-on: http://gerrit.cloudera.org:8080/21122
    Reviewed-by: Michael Smith <michael.sm...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Yida Wu <wydbaggio...@gmail.com>
---
 be/src/util/cache/cache-internal.h   |  38 ++++++-
 be/src/util/cache/cache-test.cc      |  81 ++++++++++++-
 be/src/util/cache/cache-test.h       |   5 +
 be/src/util/cache/cache.h            |  31 ++++-
 be/src/util/cache/lirs-cache-test.cc | 214 ++++++++++++++++++++++++++---------
 be/src/util/cache/lirs-cache.cc      |  49 +++++++-
 be/src/util/cache/rl-cache-test.cc   |  74 ++++++++++++
 be/src/util/cache/rl-cache.cc        | 113 +++++++++++++-----
 8 files changed, 514 insertions(+), 91 deletions(-)

diff --git a/be/src/util/cache/cache-internal.h 
b/be/src/util/cache/cache-internal.h
index 8432d321e..bb69ca27b 100644
--- a/be/src/util/cache/cache-internal.h
+++ b/be/src/util/cache/cache-internal.h
@@ -39,7 +39,9 @@ class HandleBase {
   //   It must be sized to fit the key and value with appropriate padding. See 
the
   //   description of kv_data_ptr_ for the memory layout and sizing.
   // - hash - hash of the key
-  HandleBase(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int 
charge) {
+  HandleBase(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len,
+      size_t charge) {
+    DCHECK_GE(charge, 0);
     next_handle_ = nullptr;
     key_length_ = key.size();
     val_length_ = val_len;
@@ -76,6 +78,10 @@ class HandleBase {
 
   size_t charge() const { return charge_; }
 
+  void set_charge(size_t charge) {
+    charge_ = charge;
+  }
+
  private:
   friend class HandleTable;
   HandleBase* next_handle_;
@@ -197,11 +203,12 @@ class CacheShard {
   // Initialization function. Must be called before any other function.
   virtual Status Init() = 0;
 
-  virtual HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int 
charge) = 0;
+  virtual HandleBase* Allocate(Slice key, uint32_t hash, int val_len, size_t 
charge) = 0;
   virtual void Free(HandleBase* handle) = 0;
   virtual HandleBase* Insert(HandleBase* handle,
       Cache::EvictionCallback* eviction_callback) = 0;
   virtual HandleBase* Lookup(const Slice& key, uint32_t hash, bool no_updates) 
= 0;
+  virtual void UpdateCharge(HandleBase* handle, size_t new_charge) = 0;
   virtual void Release(HandleBase* handle) = 0;
   virtual void Erase(const Slice& key, uint32_t hash) = 0;
   virtual size_t Invalidate(const Cache::InvalidationControl& ctl) = 0;
@@ -248,6 +255,10 @@ class ShardedCache : public Cache {
     for (int s = 0; s < num_shards; ++s) {
       shards_.push_back(NewCacheShard(policy, mem_tracker_.get(), per_shard));
     }
+
+    if (per_shard < shard_charge_limit_) {
+      shard_charge_limit_ = static_cast<int>(per_shard);
+    }
   }
 
   virtual ~ShardedCache() {
@@ -267,6 +278,18 @@ class ShardedCache : public Cache {
     return UniqueHandle(reinterpret_cast<Cache::Handle*>(h), 
Cache::HandleDeleter(this));
   }
 
+  size_t Charge(const UniqueHandle& handle) override {
+    return reinterpret_cast<HandleBase*>(handle.get())->charge();
+  }
+
+  void UpdateCharge(const UniqueHandle& handle, size_t charge) override {
+    HandleBase* h = reinterpret_cast<HandleBase*>(handle.get());
+    shards_[Shard(h->hash())]->UpdateCharge(h, charge);
+    // NOTE: the handle could point to an entry that has been evicted, so 
there is no
+    // guarantee that the charge will actually change. That's why we don't 
assert that
+    // h->charge() == charge here.
+  }
+
   void Erase(const Slice& key) override {
     const uint32_t hash = HashSlice(key);
     shards_[Shard(hash)]->Erase(key, hash);
@@ -288,7 +311,11 @@ class ShardedCache : public Cache {
         Cache::HandleDeleter(this));
   }
 
-  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
+  size_t MaxCharge() const override {
+    return shard_charge_limit_;
+  }
+
+  UniquePendingHandle Allocate(Slice key, int val_len, size_t charge) override 
{
     const uint32_t hash = HashSlice(key);
     HandleBase* handle = shards_[Shard(hash)]->Allocate(key, hash, val_len, 
charge);
     UniquePendingHandle h(reinterpret_cast<PendingHandle*>(handle),
@@ -297,7 +324,7 @@ class ShardedCache : public Cache {
     return h;
   }
 
-  uint8_t* MutableValue(UniquePendingHandle* handle) override {
+  uint8_t* MutableValue(UniquePendingHandle* handle) const override {
     return reinterpret_cast<HandleBase*>(handle->get())->mutable_val_ptr();
   }
 
@@ -338,6 +365,9 @@ class ShardedCache : public Cache {
   // Number of bits of hash used to determine the shard.
   const int shard_bits_;
 
+  // Max charge that can be stored in a shard.
+  size_t shard_charge_limit_ = numeric_limits<size_t>::max();
+
   static inline uint32_t HashSlice(const Slice& s) {
     return util_hash::CityHash64(reinterpret_cast<const char *>(s.data()), 
s.size());
   }
diff --git a/be/src/util/cache/cache-test.cc b/be/src/util/cache/cache-test.cc
index 708cc5784..2eebdec44 100644
--- a/be/src/util/cache/cache-test.cc
+++ b/be/src/util/cache/cache-test.cc
@@ -91,9 +91,11 @@ TEST_P(CacheTest, TrackMemory) {
   if (mem_tracker_) {
     Insert(100, 100, 1);
     ASSERT_EQ(1, mem_tracker_->consumption());
+    UpdateCharge(100, 2);
+    ASSERT_EQ(2, mem_tracker_->consumption());
     Erase(100);
     ASSERT_EQ(0, mem_tracker_->consumption());
-    ASSERT_EQ(1, mem_tracker_->peak_consumption());
+    ASSERT_EQ(2, mem_tracker_->peak_consumption());
   }
 }
 
@@ -164,6 +166,43 @@ TEST_P(CacheTest, EntriesArePinned) {
   ASSERT_EQ(102, evicted_values_[1]);
 }
 
+TEST_P(CacheTest, UpdateChargeCausesEviction) {
+  // Canary entries that could be evicted. Insert 1000 entries so
+  // that each shard would have some entries.
+  for (int i = 0; i < 1000; ++i) {
+    Insert(1000 + i, 1000 + i);
+  }
+
+  Insert(100, 100);
+  auto h1 = cache_->Lookup(EncodeInt(100));
+  ASSERT_NE(h1, nullptr);
+
+  // Updating the charge for the cache entry evicts something
+  cache_->UpdateCharge(h1, cache_->MaxCharge());
+  ASSERT_GT(evicted_keys_.size(), 0);
+  ASSERT_GT(evicted_values_.size(), 0);
+}
+
+TEST_P(CacheTest, UpdateChargeForErased) {
+  // Insert 1000 entries so that each shard would have some entries.
+  for (int i = 0; i < 1000; ++i) {
+    Insert(1000 + i, 1000 + i);
+  }
+
+  Insert(100, 100);
+  auto h1 = cache_->Lookup(EncodeInt(100));
+  ASSERT_NE(h1, nullptr);
+
+  Erase(100);
+
+  // Updating the charge for the erased element doesn't do anything,
+  // because the entry has been erased.
+  cache_->UpdateCharge(h1, cache_->MaxCharge());
+
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+}
+
 // Add a bunch of light and heavy entries and then count the combined
 // size of items still in the cache, which must be approximately the
 // same as the total capacity.
@@ -191,5 +230,45 @@ TEST_P(CacheTest, HeavyEntries) {
   ASSERT_LE(cached_weight, cache_size() + cache_size() / 10);
 }
 
+TEST_P(CacheTest, UpdateHeavyEntries) {
+  const int kLight = cache_size() / 1000;
+  const int kHeavy = cache_size() / 100;
+  int added = 0;
+  int index = 0;
+  while (added < 2 * cache_size()) {
+    Insert(index, 1000+index, kLight);
+    const int weight = (index & 1) ? kLight : kHeavy;
+    UpdateCharge(index, weight);
+    added += weight;
+    ++index;
+  }
+
+  int cached_weight = 0;
+  for (int i = 0; i < index; ++i) {
+    const int weight = (i & 1 ? kLight : kHeavy);
+    int r = Lookup(i);
+    if (r >= 0) {
+      cached_weight += weight;
+      ASSERT_EQ(1000+i, r);
+    }
+  }
+  ASSERT_LE(cached_weight, cache_size() + cache_size() / 10);
+  // FIFO won't use space very efficiently with alternating light/heavy 
entries.
+  ASSERT_GE(cached_weight, cache_size() / 2);
+}
+
+TEST_P(CacheTest, MaxCharge) {
+  // The default size of the cache is smaller than INTMAXVAL, so the max
+  // charge will be related to the cache size.
+  int max_charge = cache_->MaxCharge();
+  if (FLAGS_cache_force_single_shard) {
+    ASSERT_EQ(max_charge, cache_size());
+  } else {
+    // Split across multiple shards (# of CPUs), but we don't specifically
+    // know how many.
+    ASSERT_LE(max_charge, cache_size());
+  }
+}
+
 }  // namespace impala
 
diff --git a/be/src/util/cache/cache-test.h b/be/src/util/cache/cache-test.h
index 279c2f983..80c125c50 100644
--- a/be/src/util/cache/cache-test.h
+++ b/be/src/util/cache/cache-test.h
@@ -66,6 +66,11 @@ class CacheBaseTest : public ::testing::Test,
     return handle ? DecodeInt(cache_->Value(handle)) : -1;
   }
 
+  void UpdateCharge(int key, int charge) {
+    auto handle(cache_->Lookup(EncodeInt(key), Cache::NO_UPDATE));
+    cache_->UpdateCharge(handle, charge);
+  }
+
   // Insert a key with the give value and charge. Return whether the insert was
   // successful.
   bool Insert(int key, int value, int charge = 1) {
diff --git a/be/src/util/cache/cache.h b/be/src/util/cache/cache.h
index dabbb22e3..5571ba2e5 100644
--- a/be/src/util/cache/cache.h
+++ b/be/src/util/cache/cache.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <functional>
 #include <iosfwd>
+#include <limits>
 #include <memory>
 #include <string>
 #include <utility>
@@ -92,8 +93,14 @@ class Cache {
         : c_(c) {
     }
 
+    // It is useful to have a constructor without arguments, as
+    // this makes it easier to embed in containers.
+    explicit HandleDeleter()
+      : c_(nullptr) {}
+
     void operator()(Cache::Handle* h) const {
       if (h != nullptr) {
+        DCHECK(c_ != nullptr);
         c_->Release(h);
       }
     }
@@ -120,8 +127,14 @@ class Cache {
         : c_(c) {
     }
 
+    // It is useful to have a constructor without arguments, as
+    // this makes it easier to embed in containers.
+    explicit PendingHandleDeleter()
+      : c_(nullptr) {}
+
     void operator()(Cache::PendingHandle* h) const {
       if (h != nullptr) {
+        DCHECK(c_ != nullptr);
         c_->Free(h);
       }
     }
@@ -178,6 +191,13 @@ class Cache {
   //
   virtual UniqueHandle Lookup(const Slice& key, LookupBehavior behavior = 
NORMAL) = 0;
 
+  // Return the charge encapsulate in a raw handle returned by a successful
+  // Lookup().
+  virtual size_t Charge(const UniqueHandle& handle) = 0;
+
+  // Update the charge of a handle after insertion.
+  virtual void UpdateCharge(const UniqueHandle& handle, size_t charge) = 0;
+
   // If the cache contains entry for key, erase it.  Note that the
   // underlying entry will be kept around until all existing handles
   // to it have been released.
@@ -214,7 +234,11 @@ class Cache {
 
   // Indicates that the charge of an item in the cache should be calculated
   // based on its memory consumption.
-  static constexpr int kAutomaticCharge = -1;
+  static constexpr size_t kAutomaticCharge = 
std::numeric_limits<size_t>::max();
+
+  // Returns the maximum charge that will be accepted by this cache.
+  // Can be used to avoid generating data that we won't be able to insert.
+  virtual size_t MaxCharge() const = 0;
 
   // Allocate space for a new entry to be inserted into the cache.
   //
@@ -237,7 +261,7 @@ class Cache {
   // space for the requested allocation.
   //
   // The returned handle owns the allocated memory.
-  virtual UniquePendingHandle Allocate(Slice key, int val_len, int charge) = 0;
+  virtual UniquePendingHandle Allocate(Slice key, int val_len, size_t charge) 
= 0;
 
   // Default 'charge' should be kAutomaticCharge
   // (default arguments on virtual functions are prohibited).
@@ -245,7 +269,8 @@ class Cache {
     return Allocate(key, val_len, kAutomaticCharge);
   }
 
-  virtual uint8_t* MutableValue(UniquePendingHandle* handle) = 0;
+  // Get the mutable space created by Allocate.
+  virtual uint8_t* MutableValue(UniquePendingHandle* handle) const = 0;
 
   // Commit a prepared entry into the cache.
   //
diff --git a/be/src/util/cache/lirs-cache-test.cc 
b/be/src/util/cache/lirs-cache-test.cc
index 9753352e9..1f930f001 100644
--- a/be/src/util/cache/lirs-cache-test.cc
+++ b/be/src/util/cache/lirs-cache-test.cc
@@ -93,10 +93,9 @@ class LIRSCacheTest : public CacheBaseTest {
   }
 };
 
+// This tests a very simple case: insert entries, then flush the cache and 
verify
+// entries are evicted in the appropriate order.
 TEST_F(LIRSCacheTest, BasicEvictionOrdering) {
-  // This tests a very simple case: insert entries, then flush the cache and 
verify
-  // entries are evicted in the appropriate order.
-
   FillCache();
   FlushCache();
 
@@ -112,11 +111,10 @@ TEST_F(LIRSCacheTest, BasicEvictionOrdering) {
   }
 }
 
+// Lookup operations can be tagged as NO_UPDATE, in which case, nothing should 
change
+// priority. Verify that adding NO_UPDATE Lookups to the basic case does not 
change
+// the eviction order because nothing moves.
 TEST_F(LIRSCacheTest, LookupNoUpdate) {
-  // Lookup operations can be tagged as NO_UPDATE, in which case, nothing 
should change
-  // priority. Verify that adding NO_UPDATE Lookups to the basic case does not 
change
-  // the eviction order because nothing moves.
-
   FillCache();
 
   // Do a NO_UPDATE lookup on an unprotected element (which otherwise would 
become
@@ -141,8 +139,8 @@ TEST_F(LIRSCacheTest, LookupNoUpdate) {
   }
 }
 
+// Test that Allocate() rejects anything larger than the protected capacity 
(which is 95)
 TEST_F(LIRSCacheTest, RejectLarge) {
-  // Allocate() rejects anything larger than the protected capacity (which is 
95)
   // Insert() returns false if Allocate() fails.
   ASSERT_FALSE(Insert(100, 100, 96));
   ASSERT_EQ(evicted_keys_.size(), 0);
@@ -154,9 +152,9 @@ TEST_F(LIRSCacheTest, RejectLarge) {
   ASSERT_EQ(evicted_values_.size(), 0);
 }
 
+// This tests that inserting a single large element can evict all of the 
UNPROTECTED
+// elements, along with itself.
 TEST_F(LIRSCacheTest, LargeInsertUnprotectedEvict) {
-  // This tests that inserting a single large element can evict all of the 
UNPROTECTED
-  // elements, along with itself.
   FillCache();
   ASSERT_FALSE(Insert(100, 100, 6));
   // All 5 UNPROTECTED got evicted, along with the element being inserted.
@@ -180,10 +178,34 @@ TEST_F(LIRSCacheTest, LargeInsertUnprotectedEvict) {
   }
 }
 
-TEST_F(LIRSCacheTest, LargeProtectedToUnprotectedEvict) {
-  // This tests that a PROTECTED element that is larger than the unprotected 
capacity
-  // will evict everything including itself when it transitions to UNPROTECTED.
+// This tests that updating a single large element evicts all UNPROTECTED 
elements
+TEST_F(LIRSCacheTest, LargeUpdateUnprotectedEvict) {
+  FillCache();
+  UpdateCharge(99, 6);
+  // All 5 UNPROTECTED got evicted, along with the element being updated.
+  ASSERT_EQ(evicted_keys_.size(), 5);
+  ASSERT_EQ(evicted_values_.size(), 5);
+  for (int i = 0; i < 5; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 95+i);
+    ASSERT_EQ(evicted_values_[i], 95+i);
+  }
+  evicted_keys_.clear();
+  evicted_values_.clear();
 
+  FlushCache();
+
+  // Only the protected remain, and they are all in order
+  ASSERT_EQ(evicted_keys_.size(), 95);
+  ASSERT_EQ(evicted_values_.size(), 95);
+  for (int i = 0; i < 95; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i);
+    ASSERT_EQ(evicted_values_[i], i);
+  }
+}
+
+// This tests that a PROTECTED element that is larger than the unprotected 
capacity
+// will evict everything including itself when it transitions to UNPROTECTED.
+TEST_F(LIRSCacheTest, LargeProtectedToUnprotectedEvict) {
   // Insert PROTECTED element that is larger than the unprotected capacity
   ASSERT_TRUE(Insert(0, 0, 95));
   ASSERT_EQ(evicted_keys_.size(), 0);
@@ -213,11 +235,10 @@ TEST_F(LIRSCacheTest, LargeProtectedToUnprotectedEvict) {
   ASSERT_EQ(evicted_values_[4], 0);
 }
 
+// Large unprotected entries can transition directly to being a TOMBSTONE on 
Insert().
+// This test verifies that this TOMBSTONE entry (which is larger than the 
unprotected
+// capacity) can be promoted to be PROTECTED if there is another Insert().
 TEST_F(LIRSCacheTest, LargeTombstone) {
-  // Large unprotected entries can transition directly to being a TOMBSTONE on 
Insert().
-  // This test verifies that this TOMBSTONE entry (which is larger than the 
unprotected
-  // capacity) can be promoted to be PROTECTED if there is another Insert().
-
   // One protected element
   ASSERT_TRUE(Insert(0, 0, 95));
   ASSERT_EQ(Lookup(0), 0);
@@ -242,11 +263,38 @@ TEST_F(LIRSCacheTest, LargeTombstone) {
   ASSERT_EQ(Lookup(0), -1);
 }
 
-TEST_F(LIRSCacheTest, InsertExistingUnprotected) {
-  // This tests the behavior of insert when there is already an unprotected 
element with
-  // the same key. It should replace the existing value, but the new element 
should
-  // continue to be unprotected.
+// A client could hold a handle for an entry that gets evicted and becomes a 
TOMBSTONE
+// entry. This test verifies that if they call UpdateCharge() on a TOMBSTONE 
entry,
+// then nothing happens.
+TEST_F(LIRSCacheTest, UpdateChargeTombstone) {
+  FillCache();
+
+  // Get a handle to an entry
+  auto handle(cache_->Lookup(EncodeInt(1), Cache::NO_UPDATE));
+
+  // Flush the cache
+  FlushCache();
+
+  // Verify our key has been evicted. It can't be found via Lookup().
+  ASSERT_EQ(Lookup(1, Cache::NO_UPDATE), -1);
+
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  // Try to update the charge for our tombstone entry
+  cache_->UpdateCharge(handle, cache_->MaxCharge());
+  // The charge doesn't actually get updated
+  ASSERT_EQ(cache_->Charge(handle), 1);
+
+  // Nothing gets evicted.
+  ASSERT_EQ(evicted_keys_.size(), 0);
+  ASSERT_EQ(evicted_values_.size(), 0);
+}
 
+// This tests the behavior of insert when there is already an unprotected 
element with
+// the same key. It should replace the existing value, but the new element 
should
+// continue to be unprotected.
+TEST_F(LIRSCacheTest, InsertExistingUnprotected) {
   FillCache();
 
   // Replace an unprotected key with a new value
@@ -279,10 +327,9 @@ TEST_F(LIRSCacheTest, InsertExistingUnprotected) {
   }
 }
 
+// This is the same as InsertExistingUnprotected, except that it is verifying 
that
+// replacing an existing protected key will remain protected.
 TEST_F(LIRSCacheTest, InsertExistingProtected) {
-  // This is the same as InsertExistingUnprotected, except that it is 
verifying that
-  // replacing an existing protected key will remain protected.
-
   FillCache();
 
   // Replace a protected key with a new value
@@ -314,11 +361,10 @@ TEST_F(LIRSCacheTest, InsertExistingProtected) {
   }
 }
 
+// This is the same as InsertExistingProtected, except that it is verifying 
that
+// replacing an existing protected key that is the last entry on the recency
+// list will trim the recency list.
 TEST_F(LIRSCacheTest, InsertExistingProtectedNeedsTrim) {
-  // This is the same as InsertExistingProtected, except that it is verifying 
that
-  // replacing an existing protected key that is the last entry on the recency
-  // list will trim the recency list.
-
   FillCache();
 
   // Lookup every protected value except #25
@@ -364,10 +410,78 @@ TEST_F(LIRSCacheTest, InsertExistingProtectedNeedsTrim) {
   }
 }
 
-TEST_F(LIRSCacheTest, UnprotectedToProtected) {
-  // This tests the behavior of lookup of an unprotected key that is more 
recent than
-  // the oldest protected key (i.e. it should be promoted to be protected).
+// This tests the behavior of UpdateCharge on an UNPROTECTED element. It 
should update
+// the change, but the element should continue to be unprotected.
+TEST_F(LIRSCacheTest, UpdateExistingUnprotected) {
+  FillCache();
 
+  // Increase the charge
+  UpdateCharge(96, 2);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(evicted_keys_[0], 95);
+  ASSERT_EQ(evicted_values_[0], 95);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // The only thing we guarantee is that key 96 is still unprotected. None of 
the other
+  // unprotected elements should have moved around, but the ordering is not 
particularly
+  // important, so this only verifies that the values are still around.
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_LT(evicted_keys_[i], 100);
+    ASSERT_GE(evicted_keys_[i], 96);
+  }
+  // There were 95 protected elements (0-94). They are not impacted, and they 
are still
+  // evicted in order.
+  for (int i = 4; i < 99; ++i) {
+    ASSERT_EQ(evicted_keys_[i], i-4);
+    ASSERT_EQ(evicted_values_[i], i-4);
+  }
+}
+
+// This is the same as UpdateExistingUnprotected, except that it is verifying 
that
+// updating the charge on a protected key remains protected, and evictions 
cascade.
+TEST_F(LIRSCacheTest, UpdateExistingProtected) {
+  FillCache();
+
+  // Increase the charge on a protected key, pushing a protected key to 
unprotected and
+  // evicting an unprotected key.
+  UpdateCharge(25, 2);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(evicted_keys_[0], 95);
+  ASSERT_EQ(evicted_values_[0], 95);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  // Evict all unprotected keys.
+  ASSERT_FALSE(Insert(100, 100, 6));
+  ASSERT_EQ(6, evicted_keys_.size());
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_EQ(evicted_keys_[i], 96+i);
+    ASSERT_EQ(evicted_values_[i], 96+i);
+  }
+  // The last evicted element before the Insert was protected.
+  ASSERT_EQ(evicted_keys_[4], 0);
+  ASSERT_EQ(evicted_values_[4], 0);
+  ASSERT_EQ(evicted_keys_[5], 100);
+  ASSERT_EQ(evicted_values_[5], 100);
+  evicted_keys_.clear();
+  evicted_values_.clear();
+
+  FlushCache();
+
+  // None of the other protected elements moved around, but the exact ordering 
is not
+  // specified.
+  for (int i = 0; i < 94; ++i) {
+    ASSERT_LT(evicted_keys_[i], 95);
+    ASSERT_GE(evicted_keys_[i], 0);
+  }
+}
+
+// This tests the behavior of lookup of an unprotected key that is more recent 
than
+// the oldest protected key (i.e. it should be promoted to be protected).
+TEST_F(LIRSCacheTest, UnprotectedToProtected) {
   FillCache();
 
   // If we lookup 95, it will move from unprotected to protected.
@@ -395,10 +509,9 @@ TEST_F(LIRSCacheTest, UnprotectedToProtected) {
   }
 }
 
+// This tests the behavior of insert for a key that has a tombstone element in 
the
+// cache. It should insert the new element as a protected element.
 TEST_F(LIRSCacheTest, TombstoneToProtected) {
-  // This tests the behavior of insert for a key that has a tombstone element 
in the
-  // cache. It should insert the new element as a protected element.
-
   FillCache();
 
   // Add one more element, which will evict element 95. It is now a tombstone.
@@ -435,8 +548,10 @@ TEST_F(LIRSCacheTest, TombstoneToProtected) {
   }
 }
 
+// This tests the case where there is a lookup of an unprotected element that 
has been
+// trimmed from the recency queue. In this case, the unprotected element 
remains
+// unprotected.
 TEST_F(LIRSCacheTest, UnprotectedToUnprotected) {
-
   FillCache();
 
   // If we lookup every element that is protected, then the unprotected 
elements
@@ -468,9 +583,9 @@ TEST_F(LIRSCacheTest, UnprotectedToUnprotected) {
   }
 }
 
+// This tests the edge case where there is exactly one unprotected element
+// and that element is looked up and remains unprotected.
 TEST_F(LIRSCacheTest, ExactlyOneUnprotectedToUnprotected) {
-  // This tests the edge case where there is exactly one unprotected element
-  // and that element is looked up and remains unprotected.
   FillCache();
 
   // If we lookup every element that is protected, then the unprotected 
elements
@@ -507,9 +622,8 @@ TEST_F(LIRSCacheTest, ExactlyOneUnprotectedToUnprotected) {
   }
 }
 
+// This tests that Erase works for both unprotected and protected elements.
 TEST_F(LIRSCacheTest, Erase) {
-  // This tests that Erase works for both unprotected and protected elements.
-
   FillCache();
 
   // Erase a protected element
@@ -545,11 +659,10 @@ TEST_F(LIRSCacheTest, Erase) {
   }
 }
 
+// This is the same as InsertExistingProtectedNeedsTrim, except that it is 
verifying
+// that erasing the last protected key on the recency list will trim the 
recency
+// list.
 TEST_F(LIRSCacheTest, EraseNeedsTrim) {
-  // This is the same as InsertExistingProtectedNeedsTrim, except that it is 
verifying
-  // that erasing the last protected key on the recency list will trim the 
recency
-  // list.
-
   FillCache();
 
   // Lookup every protected value except #25
@@ -592,10 +705,9 @@ TEST_F(LIRSCacheTest, EraseNeedsTrim) {
   }
 }
 
+// This tests the enforcement of the tombstone limit. The 
lirs_tombstone_multiple is
+// 2.0, so we expect a cache with 100 elements to maintain at most 200 
tombstones.
 TEST_F(LIRSCacheTest, TombstoneLimit1) {
-  // This tests the enforcement of the tombstone limit. The 
lirs_tombstone_multiple is
-  // 2.0, so we expect a cache with 100 elements to maintain at most 200 
tombstones.
-
   // Fill the cache to the point where there are 100 normal elements and 200 
tombstones.
   FillCacheToTombstoneLimit();
 
@@ -632,10 +744,9 @@ TEST_F(LIRSCacheTest, TombstoneLimit1) {
   }
 }
 
+// This tests the enforcement of the tombstone limit. The 
lirs_tombstone_multiple is
+// 2.0, so we expect a cache with 100 elements to maintain at most 200 
tombstones.
 TEST_F(LIRSCacheTest, TombstoneLimit2) {
-  // This tests the enforcement of the tombstone limit. The 
lirs_tombstone_multiple is
-  // 2.0, so we expect a cache with 100 elements to maintain at most 200 
tombstones.
-
   // Fill the cache to the point where there are 100 normal elements and 200 
tombstones.
   FillCacheToTombstoneLimit();
 
@@ -674,10 +785,9 @@ TEST_F(LIRSCacheTest, TombstoneLimit2) {
   ASSERT_EQ(evicted_values_[99], 96);
 }
 
+// This tests the enforcement of the tombstone limit. This is a simple test 
that
+// verifies we can free multiple tombstones at once.
 TEST_F(LIRSCacheTest, TombstoneLimitFreeMultiple) {
-  // This tests the enforcement of the tombstone limit. This is a simple test 
that
-  // verifies we can free multiple tombstones at once.
-
   // Fill the cache to the point where there are 100 normal elements and 200 
tombstones.
   FillCacheToTombstoneLimit();
 
diff --git a/be/src/util/cache/lirs-cache.cc b/be/src/util/cache/lirs-cache.cc
index 20f62db59..2337d1d7f 100644
--- a/be/src/util/cache/lirs-cache.cc
+++ b/be/src/util/cache/lirs-cache.cc
@@ -322,11 +322,12 @@ class LIRSCacheShard : public CacheShard {
   ~LIRSCacheShard();
 
   Status Init() override;
-  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) 
override;
+  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, size_t charge) 
override;
   void Free(HandleBase* handle) override;
   HandleBase* Insert(HandleBase* handle,
       Cache::EvictionCallback* eviction_callback) override;
   HandleBase* Lookup(const Slice& key, uint32_t hash, bool no_updates) 
override;
+  void UpdateCharge(HandleBase* handle, size_t charge) override;
   void Release(HandleBase* handle) override;
   void Erase(const Slice& key, uint32_t hash) override;
   size_t Invalidate(const Cache::InvalidationControl& ctl) override;
@@ -861,7 +862,8 @@ void LIRSCacheShard::ToUninitialized(LIRSThreadState* 
tstate, LIRSHandle* e,
   }
 }
 
-HandleBase* LIRSCacheShard::Allocate(Slice key, uint32_t hash, int val_len, 
int charge) {
+HandleBase* LIRSCacheShard::Allocate(Slice key, uint32_t hash, int val_len,
+    size_t charge) {
   DCHECK(initialized_);
   int key_len = key.size();
   DCHECK_GE(key_len, 0);
@@ -940,7 +942,7 @@ HandleBase* LIRSCacheShard::Lookup(const Slice& key, 
uint32_t hash,
         }
         break;
       default:
-        CHECK(false);
+        CHECK(false) << "Unexpected state for Lookup: " << e->state();
       }
     }
   }
@@ -951,6 +953,47 @@ HandleBase* LIRSCacheShard::Lookup(const Slice& key, 
uint32_t hash,
   return e;
 }
 
+void LIRSCacheShard::UpdateCharge(HandleBase* handle, size_t charge) {
+  DCHECK(initialized_);
+  LIRSThreadState tstate;
+  {
+    // Hold the lock to avoid concurrent evictions
+    std::lock_guard<MutexType> l(mutex_);
+    LIRSHandle* e = static_cast<LIRSHandle*>(handle);
+    // Entries that are UNINITIALIZED or TOMBSTONE do not count towards
+    // the usage and will be destroyed without interacting with the usage.
+    // Skip modifying the charge.
+    //
+    // In the case of UNINITIALIZED, we know that the caller has a handle
+    // for the entry, so it needs to have been in the cache previously
+    // (i.e. it was found via Lookup() or added via Insert()). So, in this
+    // context, UNINITIALIZED can only mean that it has been evicted.
+    if (e->state() == UNINITIALIZED || e->state() == TOMBSTONE) {
+      return;
+    }
+    DCHECK(e->state() == PROTECTED || e->state() == UNPROTECTED);
+    int64_t delta = charge - handle->charge();
+    handle->set_charge(charge);
+    UpdateMemTracker(delta);
+    // Update usage in existing state, then evict as needed.
+    switch (e->state()) {
+    case PROTECTED:
+      protected_usage_ += delta;
+      EnforceProtectedCapacity(&tstate);
+      break;
+    case UNPROTECTED:
+      unprotected_usage_ += delta;
+      EnforceUnprotectedCapacity(&tstate);
+      break;
+    default:
+      // This can't happen.
+      CHECK(false) << "Unexpected state for UpdateCharge: " << e->state();
+      break;
+    }
+  }
+  CleanupThreadState(&tstate);
+}
+
 void LIRSCacheShard::Release(HandleBase* handle) {
   DCHECK(initialized_);
   LIRSHandle* e = static_cast<LIRSHandle*>(handle);
diff --git a/be/src/util/cache/rl-cache-test.cc 
b/be/src/util/cache/rl-cache-test.cc
index ec6d48cbc..75cf3d01d 100644
--- a/be/src/util/cache/rl-cache-test.cc
+++ b/be/src/util/cache/rl-cache-test.cc
@@ -212,6 +212,56 @@ TEST_F(FIFOCacheTest, EvictionPolicy) {
   }
 }
 
+TEST_F(FIFOCacheTest, UpdateEvictionPolicy) {
+  static constexpr int kNumElems = 20;
+  const int size_per_elem = cache_size() / kNumElems;
+  // First data chunk: fill the cache up to the capacity.
+  int idx = 0;
+  do {
+    Insert(idx, idx);
+    UpdateCharge(idx, size_per_elem);
+    // Keep looking up the very first entry: this is to make sure lookups
+    // do not affect the recency criteria of the eviction policy for FIFO 
cache.
+    Lookup(0);
+    ++idx;
+  } while (evicted_keys_.empty());
+  ASSERT_GT(idx, 1);
+
+  // Make sure the earliest inserted entry was evicted.
+  ASSERT_EQ(-1, Lookup(0));
+
+  // Verify that the 'empirical' capacity matches the expected capacity
+  // (it's a single-shard cache).
+  const int capacity = idx - 1;
+  ASSERT_EQ(kNumElems, capacity);
+
+  // Second data chunk: add (capacity / 2) more elements.
+  for (int i = 1; i < capacity / 2; ++i) {
+    // Earlier inserted elements should be gone one-by-one as new elements are
+    // inserted, and lookups should not affect the recency criteria of the FIFO
+    // eviction policy.
+    ASSERT_EQ(i, Lookup(i));
+    Insert(capacity + i, capacity + i);
+    UpdateCharge(capacity + i, size_per_elem);
+    ASSERT_EQ(capacity + i, Lookup(capacity + i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  ASSERT_EQ(capacity / 2, evicted_keys_.size());
+
+  // Early inserted elements from the first chunk should be evicted
+  // to accommodate the elements from the second chunk.
+  for (int i = 0; i < capacity / 2; ++i) {
+    SCOPED_TRACE(Substitute("early inserted elements: index $0", i));
+    ASSERT_EQ(-1, Lookup(i));
+  }
+  // The later inserted elements from the first chunk should be still
+  // in the cache.
+  for (int i = capacity / 2; i < capacity; ++i) {
+    SCOPED_TRACE(Substitute("late inserted elements: index $0", i));
+    ASSERT_EQ(i, Lookup(i));
+  }
+}
+
 class LRUCacheTest :
     public CacheBaseTest,
     public ::testing::WithParamInterface<ShardingPolicy> {
@@ -255,4 +305,28 @@ TEST_P(LRUCacheTest, EvictionPolicy) {
   ASSERT_EQ(-1, Lookup(200));
 }
 
+TEST_P(LRUCacheTest, UpdateEvictionPolicy) {
+  static constexpr int kNumElems = 1000;
+  const int size_per_elem = cache_size() / kNumElems;
+
+  Insert(100, 101);
+  Insert(200, 201);
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 100.
+  // This frequently-used entry should not be evicted. It also accesses key 
200,
+  // but the lookup uses NO_UPDATE, so this key is not preserved.
+  for (int i = 0; i < kNumElems + 1000; ++i) {
+    Insert(1000+i, 2000+i);
+    UpdateCharge(1000+i, size_per_elem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+    int entry200val = Lookup(200, Cache::NO_UPDATE);
+    ASSERT_TRUE(entry200val == -1 || entry200val == 201);
+  }
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' was accessed using NO_UPDATE in the loop above, it should have
+  // been evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
 }  // namespace impala
diff --git a/be/src/util/cache/rl-cache.cc b/be/src/util/cache/rl-cache.cc
index 5f2497172..967b85022 100644
--- a/be/src/util/cache/rl-cache.cc
+++ b/be/src/util/cache/rl-cache.cc
@@ -56,7 +56,7 @@ namespace {
 // criterion (e.g., access time for LRU policy, insertion time for FIFO 
policy).
 class RLHandle : public HandleBase {
 public:
-  RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int 
charge)
+  RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, 
size_t charge)
     : HandleBase(kv_ptr, key, hash, val_len, charge) {
     refs.store(0);
     next = nullptr;
@@ -73,6 +73,12 @@ public:
   std::atomic<int32_t> refs;
 };
 
+struct RLThreadState {
+  // Head for a linked-list of handles to evict. Eviction is done without 
holding
+  // a mutex.
+  RLHandle* to_remove_head = nullptr;
+};
+
 // A single shard of a cache that uses a recency list based eviction policy
 template<Cache::EvictionPolicy policy>
 class RLCacheShard : public CacheShard {
@@ -85,11 +91,12 @@ class RLCacheShard : public CacheShard {
   ~RLCacheShard();
 
   Status Init() override;
-  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) 
override;
+  HandleBase* Allocate(Slice key, uint32_t hash, int val_len, size_t charge) 
override;
   void Free(HandleBase* handle) override;
   HandleBase* Insert(HandleBase* handle,
       Cache::EvictionCallback* eviction_callback) override;
   HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) override;
+  void UpdateCharge(HandleBase* handle, size_t charge) override;
   void Release(HandleBase* handle) override;
   void Erase(const Slice& key, uint32_t hash) override;
   size_t Invalidate(const Cache::InvalidationControl& ctl) override;
@@ -106,6 +113,16 @@ class RLCacheShard : public CacheShard {
   // Call the user's eviction callback, if it exists, and free the entry.
   void FreeEntry(RLHandle* e);
 
+  // This enforces the capacity constraint for the cache. This should be
+  // called while holding the cache's lock. This accumulates evicted
+  // entries in the provided RLThreadState, which should be freed
+  // via CleanupThreadState().
+  void EnforceCapacity(RLThreadState* tstate);
+
+  // Functions move evictions outside the critical section for mutex_ by
+  // placing the affected entries on the LIRSThreadState. This function 
handles the
+  // accumulated evictions. It should be called without holding any locks.
+  void CleanupThreadState(RLThreadState* tstate);
 
   // Update the memtracker's consumption by the given amount.
   //
@@ -219,6 +236,10 @@ template<Cache::EvictionPolicy policy>
 void RLCacheShard<policy>::RL_Remove(RLHandle* e) {
   e->next->prev = e->prev;
   e->prev->next = e->next;
+  // Null out the next/prev to make it easy to tell that a handle is
+  // not part of the cache.
+  e->next = nullptr;
+  e->prev = nullptr;
   DCHECK_GE(usage_, e->charge());
   usage_ -= e->charge();
 }
@@ -245,11 +266,13 @@ void 
RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e)
 
 template<Cache::EvictionPolicy policy>
 HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int 
val_len,
-    int charge) {
+    size_t charge) {
   DCHECK(initialized_);
   int key_len = key.size();
   DCHECK_GE(key_len, 0);
   DCHECK_GE(val_len, 0);
+  DCHECK_GT(charge, 0);
+  if (charge == 0) return nullptr;
   int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
   uint8_t* buf = new uint8_t[sizeof(RLHandle)
                              + key_len_padded + val_len]; // the kv_data VLA 
data
@@ -276,6 +299,28 @@ void RLCacheShard<policy>::Free(HandleBase* handle) {
   delete [] data;
 }
 
+template<Cache::EvictionPolicy policy>
+void RLCacheShard<policy>::EnforceCapacity(RLThreadState* tstate) {
+  while (usage_ > capacity_ && rl_.next != &rl_) {
+    RLHandle* old = rl_.next;
+    RL_Remove(old);
+    table_.Remove(old->key(), old->hash());
+    if (Unref(old)) {
+      old->next = tstate->to_remove_head;
+      tstate->to_remove_head = old;
+    }
+  }
+}
+
+template<Cache::EvictionPolicy policy>
+void RLCacheShard<policy>::CleanupThreadState(RLThreadState* tstate) {
+  while (tstate->to_remove_head != nullptr) {
+    RLHandle* next = tstate->to_remove_head->next;
+    FreeEntry(tstate->to_remove_head);
+    tstate->to_remove_head = next;
+  }
+}
+
 template<Cache::EvictionPolicy policy>
 HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
                                          uint32_t hash,
@@ -295,6 +340,33 @@ HandleBase* RLCacheShard<policy>::Lookup(const Slice& key,
   return e;
 }
 
+template<Cache::EvictionPolicy policy>
+void RLCacheShard<policy>::UpdateCharge(HandleBase* handle, size_t charge) {
+  DCHECK(initialized_);
+
+  // Check for eviction based on new usage
+  RLThreadState tstate;
+  {
+    // Hold the lock to avoid concurrent evictions
+    std::lock_guard<decltype(mutex_)> l(mutex_);
+    RLHandle* e = static_cast<RLHandle*>(handle);
+    // If the handle is not part of the cache (i.e. it was evicted),
+    // then we shouldn't update the charge. The caller has a handle to
+    // the entry, so it was previously in the cache.
+    if (e->next == nullptr) return;
+    int64_t delta = charge - handle->charge();
+    handle->set_charge(charge);
+    UpdateMemTracker(delta);
+    usage_ += delta;
+
+    // Evict entries as needed to enforce the capacity constraint
+    EnforceCapacity(&tstate);
+  }
+
+  // we free the entries here outside of mutex for performance reasons
+  CleanupThreadState(&tstate);
+}
+
 template<Cache::EvictionPolicy policy>
 void RLCacheShard<policy>::Release(HandleBase* handle) {
   DCHECK(initialized_);
@@ -318,7 +390,7 @@ HandleBase* RLCacheShard<policy>::Insert(
   handle->refs.store(2, std::memory_order_relaxed);
   UpdateMemTracker(handle->charge());
 
-  RLHandle* to_remove_head = nullptr;
+  RLThreadState tstate;
   {
     std::lock_guard<decltype(mutex_)> l(mutex_);
 
@@ -328,29 +400,18 @@ HandleBase* RLCacheShard<policy>::Insert(
     if (old != nullptr) {
       RL_Remove(old);
       if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
+        old->next = tstate.to_remove_head;
+        tstate.to_remove_head = old;
       }
     }
 
-    while (usage_ > capacity_ && rl_.next != &rl_) {
-      RLHandle* old = rl_.next;
-      RL_Remove(old);
-      table_.Remove(old->key(), old->hash());
-      if (Unref(old)) {
-        old->next = to_remove_head;
-        to_remove_head = old;
-      }
-    }
+    // Evict entries as needed to enforce the capacity constraint
+    EnforceCapacity(&tstate);
   }
 
   // we free the entries here outside of mutex for
   // performance reasons
-  while (to_remove_head != nullptr) {
-    RLHandle* next = to_remove_head->next;
-    FreeEntry(to_remove_head);
-    to_remove_head = next;
-  }
+  CleanupThreadState(&tstate);
 
   return handle;
 }
@@ -380,7 +441,7 @@ size_t RLCacheShard<policy>::Invalidate(const 
Cache::InvalidationControl& ctl) {
   DCHECK(initialized_);
   size_t invalid_entry_count = 0;
   size_t valid_entry_count = 0;
-  RLHandle* to_remove_head = nullptr;
+  RLThreadState tstate;
 
   {
     std::lock_guard<decltype(mutex_)> l(mutex_);
@@ -403,8 +464,8 @@ size_t RLCacheShard<policy>::Invalidate(const 
Cache::InvalidationControl& ctl) {
       RL_Remove(h_to_remove);
       table_.Remove(h_to_remove->key(), h_to_remove->hash());
       if (Unref(h_to_remove)) {
-        h_to_remove->next = to_remove_head;
-        to_remove_head = h_to_remove;
+        h_to_remove->next = tstate.to_remove_head;
+        tstate.to_remove_head = h_to_remove;
       }
       ++invalid_entry_count;
     }
@@ -412,11 +473,7 @@ size_t RLCacheShard<policy>::Invalidate(const 
Cache::InvalidationControl& ctl) {
   // Once removed from the lookup table and the recency list, the entries
   // with no references left must be deallocated because Cache::Release()
   // wont be called for them from elsewhere.
-  while (to_remove_head != nullptr) {
-    RLHandle* next = to_remove_head->next;
-    FreeEntry(to_remove_head);
-    to_remove_head = next;
-  }
+  CleanupThreadState(&tstate);
   return invalid_entry_count;
 }
 


Reply via email to