http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache.cc b/be/src/kudu/util/cache.cc new file mode 100644 index 0000000..00f2e52 --- /dev/null +++ b/be/src/kudu/util/cache.cc @@ -0,0 +1,572 @@ +// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "kudu/util/cache.h" + +#include <atomic> +#include <cstdint> +#include <cstring> +#include <memory> +#include <mutex> +#include <ostream> +#include <string> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/bits.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/hash/city.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/alignment.h" +#include "kudu/util/cache_metrics.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/locks.h" +#include "kudu/util/malloc.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/metrics.h" +#include "kudu/util/slice.h" +#include "kudu/util/test_util_prod.h" + +#if !defined(__APPLE__) +#include "kudu/util/nvm_cache.h" +#endif + +// Useful in tests that require accurate cache capacity accounting. +DEFINE_bool(cache_force_single_shard, false, + "Override all cache implementations to use just one shard"); +TAG_FLAG(cache_force_single_shard, hidden); + +DEFINE_double(cache_memtracker_approximation_ratio, 0.01, + "The MemTracker associated with a cache can accumulate error up to " + "this ratio to improve performance. For tests."); +TAG_FLAG(cache_memtracker_approximation_ratio, hidden); + +using std::atomic; +using std::shared_ptr; +using std::string; +using std::vector; + +namespace kudu { + +Cache::~Cache() { +} + +namespace { + +typedef simple_spinlock MutexType; + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. Entries +// are kept in a circular doubly linked list ordered by access time. +struct LRUHandle { + Cache::EvictionCallback* eviction_callback; + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + uint32_t key_length; + uint32_t val_length; + std::atomic<int32_t> refs; + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + // The storage for the key/value pair itself. The data is stored as: + // [key bytes ...] [padding up to 8-byte boundary] [value bytes ...] + uint8_t kv_data[1]; // Beginning of key/value pair + + Slice key() const { + return Slice(kv_data, key_length); + } + + uint8_t* mutable_val_ptr() { + int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*)); + return &kv_data[val_offset]; + } + + const uint8_t* val_ptr() const { + return const_cast<LRUHandle*>(this)->mutable_val_ptr(); + } + + Slice value() const { + return Slice(val_ptr(), val_length); + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class HandleTable { + public: + HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); } + ~HandleTable() { delete[] list_; } + + LRUHandle* Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); + } + + LRUHandle* Insert(LRUHandle* h) { + LRUHandle** ptr = FindPointer(h->key(), h->hash); + LRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; + } + + LRUHandle* Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); + LRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; + } + + private: + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + uint32_t length_; + uint32_t elems_; + LRUHandle** list_; + + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash) { + LRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && + ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; + } + + void Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + auto new_list = new LRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + LRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + DCHECK_EQ(elems_, count); + delete[] list_; + list_ = new_list; + length_ = new_length; + } +}; + +// A single shard of sharded cache. +class LRUCache { + public: + explicit LRUCache(MemTracker* tracker); + ~LRUCache(); + + // Separate from constructor so caller can easily make an array of LRUCache + void SetCapacity(size_t capacity) { + capacity_ = capacity; + max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio; + } + + void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; } + + Cache::Handle* Insert(LRUHandle* handle, Cache::EvictionCallback* eviction_callback); + // Like Cache::Lookup, but with an extra "hash" parameter. + Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching); + void Release(Cache::Handle* handle); + void Erase(const Slice& key, uint32_t hash); + + private: + void LRU_Remove(LRUHandle* e); + void LRU_Append(LRUHandle* e); + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(LRUHandle* e); + // Call the user's eviction callback, if it exists, and free the entry. + void FreeEntry(LRUHandle* e); + + // Update the memtracker's consumption by the given amount. + // + // This "buffers" the updates locally in 'deferred_consumption_' until the amount + // of accumulated delta is more than ~1% of the cache capacity. This improves + // performance under workloads with high eviction rates for a few reasons: + // + // 1) once the cache reaches its full capacity, we expect it to remain there + // in steady state. Each insertion is usually matched by an eviction, and unless + // the total size of the evicted item(s) is much different than the size of the + // inserted item, each eviction event is unlikely to change the total cache usage + // much. So, we expect that the accumulated error will mostly remain around 0 + // and we can avoid propagating changes to the MemTracker at all. + // + // 2) because the cache implementation is sharded, we do this tracking in a bunch + // of different locations, avoiding bouncing cache-lines between cores. By contrast + // the MemTracker is a simple integer, so it doesn't scale as well under concurrency. + // + // Positive delta indicates an increased memory consumption. + void UpdateMemTracker(int64_t delta); + + // Initialized before use. + size_t capacity_; + + // mutex_ protects the following state. + MutexType mutex_; + size_t usage_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + LRUHandle lru_; + + HandleTable table_; + + MemTracker* mem_tracker_; + atomic<int64_t> deferred_consumption_ { 0 }; + + // Initialized based on capacity_ to ensure an upper bound on the error on the + // MemTracker consumption. + int64_t max_deferred_consumption_; + + CacheMetrics* metrics_; +}; + +LRUCache::LRUCache(MemTracker* tracker) + : usage_(0), + mem_tracker_(tracker), + metrics_(nullptr) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; +} + +LRUCache::~LRUCache() { + for (LRUHandle* e = lru_.next; e != &lru_; ) { + LRUHandle* next = e->next; + DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1) + << "caller has an unreleased handle"; + if (Unref(e)) { + FreeEntry(e); + } + e = next; + } + mem_tracker_->Consume(deferred_consumption_); +} + +bool LRUCache::Unref(LRUHandle* e) { + DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0); + return e->refs.fetch_sub(1) == 1; +} + +void LRUCache::FreeEntry(LRUHandle* e) { + DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0); + if (e->eviction_callback) { + e->eviction_callback->EvictedEntry(e->key(), e->value()); + } + UpdateMemTracker(-static_cast<int64_t>(e->charge)); + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->DecrementBy(e->charge); + metrics_->evictions->Increment(); + } + delete [] e; +} + +void LRUCache::UpdateMemTracker(int64_t delta) { + int64_t old_deferred = deferred_consumption_.fetch_add(delta); + int64_t new_deferred = old_deferred + delta; + + if (new_deferred > max_deferred_consumption_ || + new_deferred < -max_deferred_consumption_) { + int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed); + mem_tracker_->Consume(to_propagate); + } +} + +void LRUCache::LRU_Remove(LRUHandle* e) { + e->next->prev = e->prev; + e->prev->next = e->next; + usage_ -= e->charge; +} + +void LRUCache::LRU_Append(LRUHandle* e) { + // Make "e" newest entry by inserting just before lru_ + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + usage_ += e->charge; +} + +Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) { + LRUHandle* e; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Lookup(key, hash); + if (e != nullptr) { + e->refs.fetch_add(1, std::memory_order_relaxed); + LRU_Remove(e); + LRU_Append(e); + } + } + + // Do the metrics outside of the lock. + if (metrics_) { + metrics_->lookups->Increment(); + bool was_hit = (e != nullptr); + if (was_hit) { + if (caching) { + metrics_->cache_hits_caching->Increment(); + } else { + metrics_->cache_hits->Increment(); + } + } else { + if (caching) { + metrics_->cache_misses_caching->Increment(); + } else { + metrics_->cache_misses->Increment(); + } + } + } + + return reinterpret_cast<Cache::Handle*>(e); +} + +void LRUCache::Release(Cache::Handle* handle) { + LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); + bool last_reference = Unref(e); + if (last_reference) { + FreeEntry(e); + } +} + +Cache::Handle* LRUCache::Insert(LRUHandle* e, Cache::EvictionCallback *eviction_callback) { + + // Set the remaining LRUHandle members which were not already allocated during + // Allocate(). + e->eviction_callback = eviction_callback; + e->refs.store(2, std::memory_order_relaxed); // One from LRUCache, one for the returned handle + UpdateMemTracker(e->charge); + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->IncrementBy(e->charge); + metrics_->inserts->Increment(); + } + + LRUHandle* to_remove_head = nullptr; + { + std::lock_guard<MutexType> l(mutex_); + + LRU_Append(e); + + LRUHandle* old = table_.Insert(e); + if (old != nullptr) { + LRU_Remove(old); + if (Unref(old)) { + old->next = to_remove_head; + to_remove_head = old; + } + } + + while (usage_ > capacity_ && lru_.next != &lru_) { + LRUHandle* old = lru_.next; + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + if (Unref(old)) { + old->next = to_remove_head; + to_remove_head = old; + } + } + } + + // we free the entries here outside of mutex for + // performance reasons + while (to_remove_head != nullptr) { + LRUHandle* next = to_remove_head->next; + FreeEntry(to_remove_head); + to_remove_head = next; + } + + return reinterpret_cast<Cache::Handle*>(e); +} + +void LRUCache::Erase(const Slice& key, uint32_t hash) { + LRUHandle* e; + bool last_reference = false; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + LRU_Remove(e); + last_reference = Unref(e); + } + } + // mutex not held here + // last_reference will only be true if e != NULL + if (last_reference) { + FreeEntry(e); + } +} + +// Determine the number of bits of the hash that should be used to determine +// the cache shard. This, in turn, determines the number of shards. +int DetermineShardBits() { + int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ? + 0 : Bits::Log2Ceiling(base::NumCPUs()); + VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache."; + return bits; +} + +class ShardedLRUCache : public Cache { + private: + shared_ptr<MemTracker> mem_tracker_; + gscoped_ptr<CacheMetrics> metrics_; + vector<LRUCache*> shards_; + + // Number of bits of hash used to determine the shard. + const int shard_bits_; + + // Protects 'metrics_'. Used only when metrics are set, to ensure + // that they are set only once in test environments. + MutexType metrics_lock_; + + static inline uint32_t HashSlice(const Slice& s) { + return util_hash::CityHash64( + reinterpret_cast<const char *>(s.data()), s.size()); + } + + uint32_t Shard(uint32_t hash) { + // Widen to uint64 before shifting, or else on a single CPU, + // we would try to shift a uint32_t by 32 bits, which is undefined. + return static_cast<uint64_t>(hash) >> (32 - shard_bits_); + } + + public: + explicit ShardedLRUCache(size_t capacity, const string& id) + : shard_bits_(DetermineShardBits()) { + // A cache is often a singleton, so: + // 1. We reuse its MemTracker if one already exists, and + // 2. It is directly parented to the root MemTracker. + mem_tracker_ = MemTracker::FindOrCreateGlobalTracker( + -1, strings::Substitute("$0-sharded_lru_cache", id)); + + int num_shards = 1 << shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + for (int s = 0; s < num_shards; s++) { + gscoped_ptr<LRUCache> shard(new LRUCache(mem_tracker_.get())); + shard->SetCapacity(per_shard); + shards_.push_back(shard.release()); + } + } + + virtual ~ShardedLRUCache() { + STLDeleteElements(&shards_); + } + + virtual Handle* Insert(PendingHandle* handle, + Cache::EvictionCallback* eviction_callback) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle)); + return shards_[Shard(h->hash)]->Insert(h, eviction_callback); + } + virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE { + const uint32_t hash = HashSlice(key); + return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE); + } + virtual void Release(Handle* handle) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(handle); + shards_[Shard(h->hash)]->Release(handle); + } + virtual void Erase(const Slice& key) OVERRIDE { + const uint32_t hash = HashSlice(key); + shards_[Shard(hash)]->Erase(key, hash); + } + virtual Slice Value(Handle* handle) OVERRIDE { + return reinterpret_cast<LRUHandle*>(handle)->value(); + } + virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE { + // TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers + // causes TSAN errors. So, we'll ensure that metrics only get attached once, from + // whichever server starts first. This has the downside that, in test builds, we won't + // get accurate cache metrics, but that's probably better than spurious failures. + std::lock_guard<simple_spinlock> l(metrics_lock_); + if (metrics_) { + CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton"; + return; + } + metrics_.reset(new CacheMetrics(entity)); + for (LRUCache* cache : shards_) { + cache->SetMetrics(metrics_.get()); + } + } + + virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE { + int key_len = key.size(); + DCHECK_GE(key_len, 0); + DCHECK_GE(val_len, 0); + int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*)); + uint8_t* buf = new uint8_t[sizeof(LRUHandle) + + key_len_padded + val_len // the kv_data VLA data + - 1 // (the VLA has a 1-byte placeholder) + ]; + LRUHandle* handle = reinterpret_cast<LRUHandle*>(buf); + handle->key_length = key_len; + handle->val_length = val_len; + handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(buf) : charge; + handle->hash = HashSlice(key); + memcpy(handle->kv_data, key.data(), key_len); + + return reinterpret_cast<PendingHandle*>(handle); + } + + virtual void Free(PendingHandle* h) OVERRIDE { + uint8_t* data = reinterpret_cast<uint8_t*>(h); + delete [] data; + } + + virtual uint8_t* MutableValue(PendingHandle* h) OVERRIDE { + return reinterpret_cast<LRUHandle*>(h)->mutable_val_ptr(); + } + +}; + +} // end anonymous namespace + +Cache* NewLRUCache(CacheType type, size_t capacity, const string& id) { + switch (type) { + case DRAM_CACHE: + return new ShardedLRUCache(capacity, id); +#if defined(HAVE_LIB_VMEM) + case NVM_CACHE: + return NewLRUNvmCache(capacity, id); +#endif + default: + LOG(FATAL) << "Unsupported LRU cache type: " << type; + } +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache.h b/be/src/kudu/util/cache.h new file mode 100644 index 0000000..82ef8c9 --- /dev/null +++ b/be/src/kudu/util/cache.h @@ -0,0 +1,216 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A Cache is an interface that maps keys to values. It has internal +// synchronization and may be safely accessed concurrently from +// multiple threads. It may automatically evict entries to make room +// for new entries. Values have a specified charge against the cache +// capacity. For example, a cache where the values are variable +// length strings, may use the length of the string as the charge for +// the string. +// +// This is taken from LevelDB and evolved to fit the kudu codebase. +// +// TODO: this is pretty lock-heavy. Would be good to sub out something +// a little more concurrent. + +#ifndef KUDU_UTIL_CACHE_H_ +#define KUDU_UTIL_CACHE_H_ + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/slice.h" + +namespace kudu { + +class Cache; +class MetricEntity; + +enum CacheType { + DRAM_CACHE, + NVM_CACHE +}; + +// Create a new cache with a fixed size capacity. This implementation +// of Cache uses a least-recently-used eviction policy. +Cache* NewLRUCache(CacheType type, size_t capacity, const std::string& id); + +class Cache { + public: + // Callback interface which is called when an entry is evicted from the + // cache. + class EvictionCallback { + public: + virtual void EvictedEntry(Slice key, Slice value) = 0; + virtual ~EvictionCallback() {} + }; + + Cache() { } + + // Destroys all existing entries by calling the "deleter" + // function that was passed to the constructor. + virtual ~Cache(); + + // Opaque handle to an entry stored in the cache. + struct Handle { }; + + // Custom handle "deleter", primarily intended for use with std::unique_ptr. + // + // Sample usage: + // + // Cache* cache = NewLRUCache(...); + // ... + // { + // unique_ptr<Cache::Handle, Cache::HandleDeleter> h( + // cache->Lookup(...), Cache::HandleDeleter(cache)); + // ... + // } // 'h' is automatically released here + // + // Or: + // + // Cache* cache = NewLRUCache(...); + // ... + // { + // Cache::UniqueHandle h(cache->Lookup(...), Cache::HandleDeleter(cache)); + // ... + // } // 'h' is automatically released here + // + class HandleDeleter { + public: + explicit HandleDeleter(Cache* c) + : c_(c) { + } + + void operator()(Cache::Handle* h) const { + c_->Release(h); + } + + private: + Cache* c_; + }; + typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle; + + // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times + // blocks were requested that the users were hoping to get the block from the cache, along with + // with the basic metrics. + // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics. + // This helps in determining if we are effectively caching the blocks that matter the most. + enum CacheBehavior { + EXPECT_IN_CACHE, + NO_EXPECT_IN_CACHE + }; + + // If the cache has no mapping for "key", returns NULL. + // + // Else return a handle that corresponds to the mapping. The caller + // must call this->Release(handle) when the returned mapping is no + // longer needed. + virtual Handle* Lookup(const Slice& key, CacheBehavior caching) = 0; + + // Release a mapping returned by a previous Lookup(). + // REQUIRES: handle must not have been released yet. + // REQUIRES: handle must have been returned by a method on *this. + virtual void Release(Handle* handle) = 0; + + // Return the value encapsulated in a handle returned by a + // successful Lookup(). + // REQUIRES: handle must not have been released yet. + // REQUIRES: handle must have been returned by a method on *this. + virtual Slice Value(Handle* handle) = 0; + + // If the cache contains entry for key, erase it. Note that the + // underlying entry will be kept around until all existing handles + // to it have been released. + virtual void Erase(const Slice& key) = 0; + + // Pass a metric entity in order to start recoding metrics. + virtual void SetMetrics(const scoped_refptr<MetricEntity>& metric_entity) = 0; + + // ------------------------------------------------------------ + // Insertion path + // ------------------------------------------------------------ + // + // Because some cache implementations (eg NVM) manage their own memory, and because we'd + // like to read blocks directly into cache-managed memory rather than causing an extra + // memcpy, the insertion of a new element into the cache requires two phases. First, a + // PendingHandle is allocated with space for the value, and then it is later inserted. + // + // For example: + // + // PendingHandle* ph = cache_->Allocate("my entry", value_size, charge); + // if (!ReadDataFromDisk(cache_->MutableValue(ph)).ok()) { + // cache_->Free(ph); + // ... error handling ... + // return; + // } + // Handle* h = cache_->Insert(ph, my_eviction_callback); + // ... + // cache_->Release(h); + + // Opaque handle to an entry which is being prepared to be added to + // the cache. + struct PendingHandle { }; + + // Indicates that the charge of an item in the cache should be calculated + // based on its memory consumption. + static constexpr int kAutomaticCharge = -1; + + // Allocate space for a new entry to be inserted into the cache. + // + // The provided 'key' is copied into the resulting handle object. + // The allocated handle has enough space such that the value can + // be written into cache_->MutableValue(handle). + // + // If 'charge' is not 'kAutomaticCharge', then the cache capacity will be charged + // the explicit amount. This is useful when caching items that are small but need to + // maintain a bounded count (eg file descriptors) rather than caring about their actual + // memory usage. + // + // Note that this does not mutate the cache itself: lookups will + // not be able to find the provided key until it is inserted. + // + // It is possible that this will return NULL if the cache is above its capacity + // and eviction fails to free up enough space for the requested allocation. + // + // NOTE: the returned memory is not automatically freed by the cache: the + // caller must either free it using Free(), or insert it using Insert(). + virtual PendingHandle* Allocate(Slice key, int val_len, int charge) = 0; + + // Default 'charge' should be kAutomaticCharge. + // (default arguments on virtual functions are prohibited) + PendingHandle* Allocate(Slice key, int val_len) { + return Allocate(key, val_len, kAutomaticCharge); + } + + virtual uint8_t* MutableValue(PendingHandle* handle) = 0; + + // Commit a prepared entry into the cache. + // + // Returns a handle that corresponds to the mapping. The caller + // must call this->Release(handle) when the returned mapping is no + // longer needed. This method always succeeds and returns a non-null + // entry, since the space was reserved above. + // + // The 'pending' entry passed here should have been allocated using + // Cache::Allocate() above. + // + // If 'eviction_callback' is non-NULL, then it will be called when the + // entry is later evicted or when the cache shuts down. + virtual Handle* Insert(PendingHandle* pending, EvictionCallback* eviction_callback) = 0; + + // Free 'ptr', which must have been previously allocated using 'Allocate'. + virtual void Free(PendingHandle* ptr) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(Cache); +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache_metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache_metrics.cc b/be/src/kudu/util/cache_metrics.cc new file mode 100644 index 0000000..ac2fadf --- /dev/null +++ b/be/src/kudu/util/cache_metrics.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/cache_metrics.h" + +#include "kudu/util/metrics.h" + +METRIC_DEFINE_counter(server, block_cache_inserts, + "Block Cache Inserts", kudu::MetricUnit::kBlocks, + "Number of blocks inserted in the cache"); +METRIC_DEFINE_counter(server, block_cache_lookups, + "Block Cache Lookups", kudu::MetricUnit::kBlocks, + "Number of blocks looked up from the cache"); +METRIC_DEFINE_counter(server, block_cache_evictions, + "Block Cache Evictions", kudu::MetricUnit::kBlocks, + "Number of blocks evicted from the cache"); +METRIC_DEFINE_counter(server, block_cache_misses, + "Block Cache Misses", kudu::MetricUnit::kBlocks, + "Number of lookups that didn't yield a block"); +METRIC_DEFINE_counter(server, block_cache_misses_caching, + "Block Cache Misses (Caching)", kudu::MetricUnit::kBlocks, + "Number of lookups that were expecting a block that didn't yield one." + "Use this number instead of cache_misses when trying to determine how " + "efficient the cache is"); +METRIC_DEFINE_counter(server, block_cache_hits, + "Block Cache Hits", kudu::MetricUnit::kBlocks, + "Number of lookups that found a block"); +METRIC_DEFINE_counter(server, block_cache_hits_caching, + "Block Cache Hits (Caching)", kudu::MetricUnit::kBlocks, + "Number of lookups that were expecting a block that found one." + "Use this number instead of cache_hits when trying to determine how " + "efficient the cache is"); + +METRIC_DEFINE_gauge_uint64(server, block_cache_usage, "Block Cache Memory Usage", + kudu::MetricUnit::kBytes, + "Memory consumed by the block cache"); + +namespace kudu { + +#define MINIT(member, x) member(METRIC_##x.Instantiate(entity)) +#define GINIT(member, x) member(METRIC_##x.Instantiate(entity, 0)) +CacheMetrics::CacheMetrics(const scoped_refptr<MetricEntity>& entity) + : MINIT(inserts, block_cache_inserts), + MINIT(lookups, block_cache_lookups), + MINIT(evictions, block_cache_evictions), + MINIT(cache_hits, block_cache_hits), + MINIT(cache_hits_caching, block_cache_hits_caching), + MINIT(cache_misses, block_cache_misses), + MINIT(cache_misses_caching, block_cache_misses_caching), + GINIT(cache_usage, block_cache_usage) { +} +#undef MINIT +#undef GINIT + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cache_metrics.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache_metrics.h b/be/src/kudu/util/cache_metrics.h new file mode 100644 index 0000000..04a546b --- /dev/null +++ b/be/src/kudu/util/cache_metrics.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CACHE_METRICS_H +#define KUDU_UTIL_CACHE_METRICS_H + +#include <cstdint> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/metrics.h" + +namespace kudu { + +struct CacheMetrics { + explicit CacheMetrics(const scoped_refptr<MetricEntity>& metric_entity); + + scoped_refptr<Counter> inserts; + scoped_refptr<Counter> lookups; + scoped_refptr<Counter> evictions; + scoped_refptr<Counter> cache_hits; + scoped_refptr<Counter> cache_hits_caching; + scoped_refptr<Counter> cache_misses; + scoped_refptr<Counter> cache_misses_caching; + + scoped_refptr<AtomicGauge<uint64_t> > cache_usage; +}; + +} // namespace kudu +#endif /* KUDU_UTIL_CACHE_METRICS_H */ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/callback_bind-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/callback_bind-test.cc b/be/src/kudu/util/callback_bind-test.cc new file mode 100644 index 0000000..392f496 --- /dev/null +++ b/be/src/kudu/util/callback_bind-test.cc @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <ostream> +#include <string> +#include <type_traits> +#include <utility> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/bind_helpers.h" +#include "kudu/gutil/callback.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" + +namespace kudu { + +using std::string; + +static int Return5() { + return 5; +} + +TEST(CallbackBindTest, TestFreeFunction) { + Callback<int(void)> func_cb = Bind(&Return5); + ASSERT_EQ(5, func_cb.Run()); +} + +class Ref : public RefCountedThreadSafe<Ref> { + public: + int Foo() { return 3; } +}; + +// Simple class that helps with verifying ref counting. +// Not thread-safe. +struct RefCountable { + RefCountable() + : refs(0) { + } + void AddRef() const { + refs++; + } + void Release() const { + refs--; + } + void Print() const { + LOG(INFO) << "Hello. Refs: " << refs; + } + + mutable int refs; + DISALLOW_COPY_AND_ASSIGN(RefCountable); +}; + +TEST(CallbackBindTest, TestClassMethod) { + scoped_refptr<Ref> ref = new Ref(); + Callback<int(void)> ref_cb = Bind(&Ref::Foo, ref); + ref = nullptr; + ASSERT_EQ(3, ref_cb.Run()); +} + +int ReturnI(int i, const char* str) { + return i; +} + +TEST(CallbackBindTest, TestPartialBind) { + Callback<int(const char*)> cb = Bind(&ReturnI, 23); + ASSERT_EQ(23, cb.Run("hello world")); +} + +char IncrementChar(gscoped_ptr<char> in) { + return *in + 1; +} + +TEST(CallbackBindTest, TestCallScopedPtrArg) { + // Calling a function with a gscoped_ptr argument is just like any other + // function which takes gscoped_ptr: + gscoped_ptr<char> foo(new char('x')); + Callback<char(gscoped_ptr<char>)> cb = Bind(&IncrementChar); + ASSERT_EQ('y', cb.Run(std::move(foo))); +} + +TEST(CallbackBindTest, TestBindScopedPtrArg) { + // Binding a function with a gscoped_ptr argument requires using Passed() + gscoped_ptr<char> foo(new char('x')); + Callback<char(void)> cb = Bind(&IncrementChar, Passed(&foo)); + ASSERT_EQ('y', cb.Run()); +} + +// Test that the ref counting functionality works. +TEST(CallbackBindTest, TestRefCounting) { + RefCountable countable; + { + ASSERT_EQ(0, countable.refs); + Closure cb = Bind(&RefCountable::Print, &countable); + ASSERT_EQ(1, countable.refs); + cb.Run(); + ASSERT_EQ(1, countable.refs); + } + ASSERT_EQ(0, countable.refs); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding-inl.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding-inl.h b/be/src/kudu/util/coding-inl.h new file mode 100644 index 0000000..a47e9ce --- /dev/null +++ b/be/src/kudu/util/coding-inl.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Some portions Copyright (c) 2011 The LevelDB Authors. +// +// Endian-neutral encoding: +// * Fixed-length numbers are encoded with least-significant byte first +// * In addition we support variable length "varint" encoding +// * Strings are encoded prefixed by their length in varint format + +#ifndef KUDU_UTIL_CODING_INL_H +#define KUDU_UTIL_CODING_INL_H + +#include <cstdint> +#include <cstring> + +#include "kudu/gutil/port.h" // IWYU pragma: keep +// IWYU pragma: no_include <endian.h> + +namespace kudu { + +inline uint8_t *InlineEncodeVarint32(uint8_t *dst, uint32_t v) { + // Operate on characters as unsigneds + uint8_t *ptr = dst; + static const int B = 128; + if (v < (1<<7)) { + *(ptr++) = v; + } else if (v < (1<<14)) { + *(ptr++) = v | B; + *(ptr++) = v>>7; + } else if (v < (1<<21)) { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = v>>14; + } else if (v < (1<<28)) { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = (v>>14) | B; + *(ptr++) = v>>21; + } else { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = (v>>14) | B; + *(ptr++) = (v>>21) | B; + *(ptr++) = v>>28; + } + return ptr; +} + +inline void InlineEncodeFixed32(uint8_t *buf, uint32_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; +#endif +} + +inline void InlineEncodeFixed64(uint8_t *buf, uint64_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; + buf[4] = (value >> 32) & 0xff; + buf[5] = (value >> 40) & 0xff; + buf[6] = (value >> 48) & 0xff; + buf[7] = (value >> 56) & 0xff; +#endif +} + + +// Standard Put... routines append to a string +template <class StrType> +inline void InlinePutFixed32(StrType *dst, uint32_t value) { + uint8_t buf[sizeof(value)]; + InlineEncodeFixed32(buf, value); + dst->append(buf, sizeof(buf)); +} + +template <class StrType> +inline void InlinePutFixed64(StrType *dst, uint64_t value) { + uint8_t buf[sizeof(value)]; + InlineEncodeFixed64(buf, value); + dst->append(buf, sizeof(buf)); +} + +template <class StrType> +inline void InlinePutVarint32(StrType* dst, uint32_t v) { + // We resize the array and then size it back down as appropriate + // rather than using append(), since the generated code ends up + // being substantially shorter. + int old_size = dst->size(); + dst->resize(old_size + 5); + uint8_t* p = &(*dst)[old_size]; + uint8_t *ptr = InlineEncodeVarint32(p, v); + + dst->resize(old_size + ptr - p); +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding.cc b/be/src/kudu/util/coding.cc new file mode 100644 index 0000000..952af28 --- /dev/null +++ b/be/src/kudu/util/coding.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "kudu/util/coding.h" +#include "kudu/util/coding-inl.h" +#include "kudu/util/faststring.h" + +namespace kudu { + +void PutVarint32(faststring* dst, uint32_t v) { + uint8_t buf[5]; + uint8_t* ptr = InlineEncodeVarint32(buf, v); + dst->append(buf, ptr - buf); +} + +uint8_t* EncodeVarint64(uint8_t* dst, uint64_t v) { + static const int B = 128; + while (v >= B) { + *(dst++) = (v & (B-1)) | B; + v >>= 7; + } + *(dst++) = static_cast<uint8_t>(v); + return dst; +} + +void PutFixed32(faststring *dst, uint32_t value) { + InlinePutFixed32(dst, value); +} + +void PutFixed64(faststring *dst, uint64_t value) { + InlinePutFixed64(dst, value); +} + +void PutVarint64(faststring *dst, uint64_t v) { + uint8_t buf[10]; + uint8_t* ptr = EncodeVarint64(buf, v); + dst->append(buf, ptr - buf); +} + +void PutLengthPrefixedSlice(faststring* dst, const Slice& value) { + PutVarint32(dst, value.size()); + dst->append(value.data(), value.size()); +} + +void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value) { + PutFixed32(dst, value.size()); + dst->append(value.data(), value.size()); +} + +int VarintLength(uint64_t v) { + int len = 1; + while (v >= 128) { + v >>= 7; + len++; + } + return len; +} + +const uint8_t *GetVarint32PtrFallback(const uint8_t *p, + const uint8_t *limit, + uint32_t* value) { + uint32_t result = 0; + for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { + uint32_t byte = *p; + p++; + if (byte & 128) { + // More bytes are present + result |= ((byte & 127) << shift); + } else { + result |= (byte << shift); + *value = result; + return p; + } + } + return nullptr; +} + +bool GetVarint32(Slice* input, uint32_t* value) { + const uint8_t *p = input->data(); + const uint8_t *limit = p + input->size(); + const uint8_t *q = GetVarint32Ptr(p, limit, value); + if (q == nullptr) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +const uint8_t *GetVarint64Ptr(const uint8_t *p, const uint8_t *limit, uint64_t* value) { + uint64_t result = 0; + for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) { + uint64_t byte = *p; + p++; + if (byte & 128) { + // More bytes are present + result |= ((byte & 127) << shift); + } else { + result |= (byte << shift); + *value = result; + return p; + } + } + return nullptr; +} + +bool GetVarint64(Slice* input, uint64_t* value) { + const uint8_t *p = input->data(); + const uint8_t *limit = p + input->size(); + const uint8_t *q = GetVarint64Ptr(p, limit, value); + if (q == nullptr) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +const uint8_t *GetLengthPrefixedSlice(const uint8_t *p, const uint8_t *limit, + Slice* result) { + uint32_t len = 0; + p = GetVarint32Ptr(p, limit, &len); + if (p == nullptr) return nullptr; + if (p + len > limit) return nullptr; + *result = Slice(p, len); + return p + len; +} + +bool GetLengthPrefixedSlice(Slice* input, Slice* result) { + uint32_t len = 0; + if (GetVarint32(input, &len) && + input->size() >= len) { + *result = Slice(input->data(), len); + input->remove_prefix(len); + return true; + } else { + return false; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/coding.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding.h b/be/src/kudu/util/coding.h new file mode 100644 index 0000000..0612533 --- /dev/null +++ b/be/src/kudu/util/coding.h @@ -0,0 +1,113 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Endian-neutral encoding: +// * Fixed-length numbers are encoded with least-significant byte first +// * In addition we support variable length "varint" encoding +// * Strings are encoded prefixed by their length in varint format + +#ifndef STORAGE_LEVELDB_UTIL_CODING_H_ +#define STORAGE_LEVELDB_UTIL_CODING_H_ + +#include <cstdint> +#include <cstring> + +#include "kudu/util/slice.h" +#include "kudu/gutil/port.h" // IWYU pragma: keep +// IWYU pragma: no_include <endian.h> + +namespace kudu { + +class faststring; + +extern void PutFixed32(faststring* dst, uint32_t value); +extern void PutFixed64(faststring* dst, uint64_t value); +extern void PutVarint32(faststring* dst, uint32_t value); +extern void PutVarint64(faststring* dst, uint64_t value); + +// Put a length-prefixed Slice into the buffer. The length prefix +// is varint-encoded. +extern void PutLengthPrefixedSlice(faststring* dst, const Slice& value); + +// Put a length-prefixed Slice into the buffer. The length prefix +// is 32-bit fixed encoded in little endian. +extern void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value); + +// Standard Get... routines parse a value from the beginning of a Slice +// and advance the slice past the parsed value. +extern bool GetVarint32(Slice* input, uint32_t* value); +extern bool GetVarint64(Slice* input, uint64_t* value); +extern bool GetLengthPrefixedSlice(Slice* input, Slice* result); + +// Pointer-based variants of GetVarint... These either store a value +// in *v and return a pointer just past the parsed value, or return +// NULL on error. These routines only look at bytes in the range +// [p..limit-1] +extern const uint8_t *GetVarint32Ptr(const uint8_t *p,const uint8_t *limit, uint32_t* v); +extern const uint8_t *GetVarint64Ptr(const uint8_t *p,const uint8_t *limit, uint64_t* v); + +// Returns the length of the varint32 or varint64 encoding of "v" +extern int VarintLength(uint64_t v); + +// Lower-level versions of Put... that write directly into a character buffer +// REQUIRES: dst has enough space for the value being written +extern void EncodeFixed32(uint8_t *dst, uint32_t value); +extern void EncodeFixed64(uint8_t *dst, uint64_t value); + +// Lower-level versions of Put... that write directly into a character buffer +// and return a pointer just past the last byte written. +// REQUIRES: dst has enough space for the value being written +extern uint8_t *EncodeVarint32(uint8_t *dst, uint32_t value); +extern uint8_t *EncodeVarint64(uint8_t *dst, uint64_t value); + +// Lower-level versions of Get... that read directly from a character buffer +// without any bounds checking. + +inline uint32_t DecodeFixed32(const uint8_t *ptr) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + // Load the raw bytes + uint32_t result; + memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load + return result; +#else + return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0]))) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24)); +#endif +} + +inline uint64_t DecodeFixed64(const uint8_t *ptr) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + // Load the raw bytes + uint64_t result; + memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load + return result; +#else + uint64_t lo = DecodeFixed32(ptr); + uint64_t hi = DecodeFixed32(ptr + 4); + return (hi << 32) | lo; +#endif +} + +// Internal routine for use by fallback path of GetVarint32Ptr +extern const uint8_t *GetVarint32PtrFallback(const uint8_t *p, + const uint8_t *limit, + uint32_t* value); +inline const uint8_t *GetVarint32Ptr(const uint8_t *p, + const uint8_t *limit, + uint32_t* value) { + if (PREDICT_TRUE(p < limit)) { + uint32_t result = *p; + if (PREDICT_TRUE((result & 128) == 0)) { + *value = result; + return p + 1; + } + } + return GetVarint32PtrFallback(p, limit, value); +} + +} // namespace kudu + +#endif // STORAGE_LEVELDB_UTIL_CODING_H_ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression-test.cc b/be/src/kudu/util/compression/compression-test.cc new file mode 100644 index 0000000..6b46a4f --- /dev/null +++ b/be/src/kudu/util/compression/compression-test.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/compression/compression.pb.h" +#include "kudu/util/compression/compression_codec.h" +#include "kudu/util/slice.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +using std::vector; + +class TestCompression : public KuduTest {}; + +static void TestCompressionCodec(CompressionType compression) { + const int kInputSize = 64; + + const CompressionCodec* codec; + uint8_t ibuffer[kInputSize]; + uint8_t ubuffer[kInputSize]; + size_t compressed; + + // Fill the test input buffer + memset(ibuffer, 'Z', kInputSize); + + // Get the specified compression codec + ASSERT_OK(GetCompressionCodec(compression, &codec)); + + // Allocate the compression buffer + size_t max_compressed = codec->MaxCompressedLength(kInputSize); + ASSERT_LT(max_compressed, (kInputSize * 2)); + gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]); + + // Compress and uncompress + ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed)); + ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize)); + ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize)); + + // Compress slices and uncompress + vector<Slice> v; + v.emplace_back(ibuffer, 1); + for (int i = 1; i <= kInputSize; i += 7) + v.emplace_back(ibuffer + i, 7); + ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed)); + ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize)); + ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize)); +} + +TEST_F(TestCompression, TestNoCompressionCodec) { + const CompressionCodec* codec; + ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec)); + ASSERT_EQ(nullptr, codec); +} + +TEST_F(TestCompression, TestSnappyCompressionCodec) { + TestCompressionCodec(SNAPPY); +} + +TEST_F(TestCompression, TestLz4CompressionCodec) { + TestCompressionCodec(LZ4); +} + +TEST_F(TestCompression, TestZlibCompressionCodec) { + TestCompressionCodec(ZLIB); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression.proto b/be/src/kudu/util/compression/compression.proto new file mode 100644 index 0000000..a0f5343 --- /dev/null +++ b/be/src/kudu/util/compression/compression.proto @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +option java_package = "org.apache.kudu"; + +enum CompressionType { + UNKNOWN_COMPRESSION = 999; + DEFAULT_COMPRESSION = 0; + NO_COMPRESSION = 1; + SNAPPY = 2; + LZ4 = 3; + ZLIB = 4; +} http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression_codec.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression_codec.cc b/be/src/kudu/util/compression/compression_codec.cc new file mode 100644 index 0000000..a2231b6 --- /dev/null +++ b/be/src/kudu/util/compression/compression_codec.cc @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/compression/compression_codec.h" + +#include <memory> +#include <ostream> +#include <string> +#include <vector> + +#include <glog/logging.h> +#include <lz4.h> +#include <snappy-sinksource.h> +#include <snappy.h> +#include <zlib.h> + +#include "kudu/gutil/port.h" +#include "kudu/gutil/singleton.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/faststring.h" +#include "kudu/util/logging.h" +#include "kudu/util/string_case.h" + +namespace kudu { + +using std::vector; + +CompressionCodec::CompressionCodec() { +} +CompressionCodec::~CompressionCodec() { +} + +class SlicesSource : public snappy::Source { + public: + explicit SlicesSource(const std::vector<Slice>& slices) + : slice_index_(0), + slice_offset_(0), + slices_(slices) { + available_ = TotalSize(); + } + + size_t Available() const OVERRIDE { + return available_; + } + + const char* Peek(size_t* len) OVERRIDE { + if (available_ == 0) { + *len = 0; + return nullptr; + } + + const Slice& data = slices_[slice_index_]; + *len = data.size() - slice_offset_; + return reinterpret_cast<const char *>(data.data()) + slice_offset_; + } + + void Skip(size_t n) OVERRIDE { + DCHECK_LE(n, Available()); + if (n == 0) return; + + available_ -= n; + if ((n + slice_offset_) < slices_[slice_index_].size()) { + slice_offset_ += n; + } else { + n -= slices_[slice_index_].size() - slice_offset_; + slice_index_++; + while (n > 0 && n >= slices_[slice_index_].size()) { + n -= slices_[slice_index_].size(); + slice_index_++; + } + slice_offset_ = n; + } + } + + void Dump(faststring *buffer) { + buffer->reserve(buffer->size() + TotalSize()); + for (const Slice& block : slices_) { + buffer->append(block.data(), block.size()); + } + } + + private: + size_t TotalSize(void) const { + size_t size = 0; + for (const Slice& data : slices_) { + size += data.size(); + } + return size; + } + + private: + size_t available_; + size_t slice_index_; + size_t slice_offset_; + const vector<Slice>& slices_; +}; + +class SnappyCodec : public CompressionCodec { + public: + static SnappyCodec *GetSingleton() { + return Singleton<SnappyCodec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(), + reinterpret_cast<char *>(compressed), compressed_length); + return Status::OK(); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + SlicesSource source(input_slices); + snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed)); + if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) { + return Status::Corruption("unable to compress the buffer"); + } + return Status::OK(); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, + size_t uncompressed_length) const OVERRIDE { + bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()), + compressed.size(), reinterpret_cast<char *>(uncompressed)); + return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + return snappy::MaxCompressedLength(source_bytes); + } + + CompressionType type() const override { + return SNAPPY; + } +}; + +class Lz4Codec : public CompressionCodec { + public: + static Lz4Codec *GetSingleton() { + return Singleton<Lz4Codec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + int n = LZ4_compress(reinterpret_cast<const char *>(input.data()), + reinterpret_cast<char *>(compressed), input.size()); + *compressed_length = n; + return Status::OK(); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + if (input_slices.size() == 1) { + return Compress(input_slices[0], compressed, compressed_length); + } + + SlicesSource source(input_slices); + faststring buffer; + source.Dump(&buffer); + return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, + size_t uncompressed_length) const OVERRIDE { + int n = LZ4_decompress_fast(reinterpret_cast<const char *>(compressed.data()), + reinterpret_cast<char *>(uncompressed), uncompressed_length); + if (n != compressed.size()) { + return Status::Corruption( + StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n), + KUDU_REDACT(compressed.ToDebugString(100))); + } + return Status::OK(); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + return LZ4_compressBound(source_bytes); + } + + CompressionType type() const override { + return LZ4; + } +}; + +/** + * TODO: use a instance-local Arena and pass alloc/free into zlib + * so that it allocates from the arena. + */ +class ZlibCodec : public CompressionCodec { + public: + static ZlibCodec *GetSingleton() { + return Singleton<ZlibCodec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + *compressed_length = MaxCompressedLength(input.size()); + int err = ::compress(compressed, compressed_length, input.data(), input.size()); + return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer"); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + if (input_slices.size() == 1) { + return Compress(input_slices[0], compressed, compressed_length); + } + + // TODO: use z_stream + SlicesSource source(input_slices); + faststring buffer; + source.Dump(&buffer); + return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE { + int err = ::uncompress(uncompressed, &uncompressed_length, + compressed.data(), compressed.size()); + return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block + return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14))); + } + + CompressionType type() const override { + return ZLIB; + } +}; + +Status GetCompressionCodec(CompressionType compression, + const CompressionCodec** codec) { + switch (compression) { + case NO_COMPRESSION: + *codec = nullptr; + break; + case SNAPPY: + *codec = SnappyCodec::GetSingleton(); + break; + case LZ4: + *codec = Lz4Codec::GetSingleton(); + break; + case ZLIB: + *codec = ZlibCodec::GetSingleton(); + break; + default: + return Status::NotFound("bad compression type"); + } + return Status::OK(); +} + +CompressionType GetCompressionCodecType(const std::string& name) { + std::string uname; + ToUpperCase(name, &uname); + + if (uname == "SNAPPY") + return SNAPPY; + if (uname == "LZ4") + return LZ4; + if (uname == "ZLIB") + return ZLIB; + if (uname == "NONE") + return NO_COMPRESSION; + + LOG(WARNING) << "Unable to recognize the compression codec '" << name + << "' using no compression as default."; + return NO_COMPRESSION; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/compression/compression_codec.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression_codec.h b/be/src/kudu/util/compression/compression_codec.h new file mode 100644 index 0000000..4f81fd3 --- /dev/null +++ b/be/src/kudu/util/compression/compression_codec.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_CFILE_COMPRESSION_CODEC_H +#define KUDU_CFILE_COMPRESSION_CODEC_H + +#include <cstddef> +#include <cstdint> +#include <string> +#include <vector> + +#include <snappy-stubs-public.h> + +#include "kudu/util/compression/compression.pb.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +namespace kudu { + +class CompressionCodec { + public: + CompressionCodec(); + virtual ~CompressionCodec(); + + // REQUIRES: "compressed" must point to an area of memory that is at + // least "MaxCompressedLength(input_length)" bytes in length. + // + // Takes the data stored in "input[0..input_length]" and stores + // it in the array pointed to by "compressed". + // + // returns the length of the compressed output. + virtual Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const = 0; + + virtual Status Compress(const std::vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const = 0; + + // Given data in "compressed[0..compressed_length-1]" generated by + // calling the Compress routine, this routine stores the uncompressed data + // to uncompressed[0..uncompressed_length-1] + // returns false if the message is corrupted and could not be uncompressed + virtual Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, size_t uncompressed_length) const = 0; + + // Returns the maximal size of the compressed representation of + // input data that is "source_bytes" bytes in length. + virtual size_t MaxCompressedLength(size_t source_bytes) const = 0; + + // Return the type of compression implemented by this codec. + virtual CompressionType type() const = 0; + private: + DISALLOW_COPY_AND_ASSIGN(CompressionCodec); +}; + +// Returns the compression codec for the specified type. +// +// The returned codec is a singleton and should be not be destroyed. +Status GetCompressionCodec(CompressionType compression, + const CompressionCodec** codec); + +// Returns the compression codec type given the name +CompressionType GetCompressionCodecType(const std::string& name); + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/condition_variable.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/condition_variable.cc b/be/src/kudu/util/condition_variable.cc new file mode 100644 index 0000000..369d20d --- /dev/null +++ b/be/src/kudu/util/condition_variable.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "kudu/util/condition_variable.h" + +#include <sys/time.h> + +#include <cerrno> +#include <cstdint> +#include <ctime> +#include <ostream> + +#include <glog/logging.h> + +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +ConditionVariable::ConditionVariable(Mutex* user_lock) + : user_mutex_(&user_lock->native_handle_) +#if !defined(NDEBUG) + , user_lock_(user_lock) +#endif +{ + int rv = 0; +#if defined(__APPLE__) + rv = pthread_cond_init(&condition_, nullptr); +#else + // On Linux we can't use relative times like on macOS; reconfiguring the + // condition variable to use the monotonic clock means we can use support + // WaitFor with our MonoTime implementation. + pthread_condattr_t attrs; + rv = pthread_condattr_init(&attrs); + DCHECK_EQ(0, rv); + pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); + rv = pthread_cond_init(&condition_, &attrs); + pthread_condattr_destroy(&attrs); +#endif + DCHECK_EQ(0, rv); +} + +ConditionVariable::~ConditionVariable() { + int rv = pthread_cond_destroy(&condition_); + DCHECK_EQ(0, rv); +} + +void ConditionVariable::Wait() const { + ThreadRestrictions::AssertWaitAllowed(); +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + int rv = pthread_cond_wait(&condition_, user_mutex_); + DCHECK_EQ(0, rv); +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif +} + +bool ConditionVariable::WaitUntil(const MonoTime& until) const { + ThreadRestrictions::AssertWaitAllowed(); + + // Have we already timed out? + MonoTime now = MonoTime::Now(); + if (now > until) { + return false; + } + +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + +#if defined(__APPLE__) + // macOS does not provide a way to configure pthread_cond_timedwait() to use + // monotonic clocks, so we must convert the deadline into a delta and perform + // a relative wait. + MonoDelta delta = until - now; + struct timespec relative_time; + delta.ToTimeSpec(&relative_time); + int rv = pthread_cond_timedwait_relative_np( + &condition_, user_mutex_, &relative_time); +#else + struct timespec absolute_time; + until.ToTimeSpec(&absolute_time); + int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time); +#endif + DCHECK(rv == 0 || rv == ETIMEDOUT) + << "unexpected pthread_cond_timedwait return value: " << rv; + +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif + return rv == 0; +} + +bool ConditionVariable::WaitFor(const MonoDelta& delta) const { + ThreadRestrictions::AssertWaitAllowed(); + + // Negative delta means we've already timed out. + int64_t nsecs = delta.ToNanoseconds(); + if (nsecs < 0) { + return false; + } + +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + +#if defined(__APPLE__) + struct timespec relative_time; + delta.ToTimeSpec(&relative_time); + int rv = pthread_cond_timedwait_relative_np( + &condition_, user_mutex_, &relative_time); +#else + // The timeout argument to pthread_cond_timedwait is in absolute time. + struct timespec absolute_time; + MonoTime deadline = MonoTime::Now() + delta; + deadline.ToTimeSpec(&absolute_time); + int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time); +#endif + + DCHECK(rv == 0 || rv == ETIMEDOUT) + << "unexpected pthread_cond_timedwait return value: " << rv; +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif + return rv == 0; +} + +void ConditionVariable::Broadcast() { + int rv = pthread_cond_broadcast(&condition_); + DCHECK_EQ(0, rv); +} + +void ConditionVariable::Signal() { + int rv = pthread_cond_signal(&condition_); + DCHECK_EQ(0, rv); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/condition_variable.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/condition_variable.h b/be/src/kudu/util/condition_variable.h new file mode 100644 index 0000000..1245646 --- /dev/null +++ b/be/src/kudu/util/condition_variable.h @@ -0,0 +1,118 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// ConditionVariable wraps pthreads condition variable synchronization or, on +// Windows, simulates it. This functionality is very helpful for having +// several threads wait for an event, as is common with a thread pool managed +// by a master. The meaning of such an event in the (worker) thread pool +// scenario is that additional tasks are now available for processing. It is +// used in Chrome in the DNS prefetching system to notify worker threads that +// a queue now has items (tasks) which need to be tended to. A related use +// would have a pool manager waiting on a ConditionVariable, waiting for a +// thread in the pool to announce (signal) that there is now more room in a +// (bounded size) communications queue for the manager to deposit tasks, or, +// as a second example, that the queue of tasks is completely empty and all +// workers are waiting. +// +// USAGE NOTE 1: spurious signal events are possible with this and +// most implementations of condition variables. As a result, be +// *sure* to retest your condition before proceeding. The following +// is a good example of doing this correctly: +// +// while (!work_to_be_done()) Wait(...); +// +// In contrast do NOT do the following: +// +// if (!work_to_be_done()) Wait(...); // Don't do this. +// +// Especially avoid the above if you are relying on some other thread only +// issuing a signal up *if* there is work-to-do. There can/will +// be spurious signals. Recheck state on waiting thread before +// assuming the signal was intentional. Caveat caller ;-). +// +// USAGE NOTE 2: Broadcast() frees up all waiting threads at once, +// which leads to contention for the locks they all held when they +// called Wait(). This results in POOR performance. A much better +// approach to getting a lot of threads out of Wait() is to have each +// thread (upon exiting Wait()) call Signal() to free up another +// Wait'ing thread. Look at condition_variable_unittest.cc for +// both examples. +// +// Broadcast() can be used nicely during teardown, as it gets the job +// done, and leaves no sleeping threads... and performance is less +// critical at that point. +// +// The semantics of Broadcast() are carefully crafted so that *all* +// threads that were waiting when the request was made will indeed +// get signaled. Some implementations mess up, and don't signal them +// all, while others allow the wait to be effectively turned off (for +// a while while waiting threads come around). This implementation +// appears correct, as it will not "lose" any signals, and will guarantee +// that all threads get signaled by Broadcast(). +// +// This implementation offers support for "performance" in its selection of +// which thread to revive. Performance, in direct contrast with "fairness," +// assures that the thread that most recently began to Wait() is selected by +// Signal to revive. Fairness would (if publicly supported) assure that the +// thread that has Wait()ed the longest is selected. The default policy +// may improve performance, as the selected thread may have a greater chance of +// having some of its stack data in various CPU caches. +// +// For a discussion of the many very subtle implementation details, see the FAQ +// at the end of condition_variable_win.cc. + +#ifndef BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ +#define BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ + +#include <pthread.h> + +#include "kudu/gutil/macros.h" + +namespace kudu { + +class MonoDelta; +class MonoTime; +class Mutex; + +class ConditionVariable { + public: + // Construct a cv for use with ONLY one user lock. + explicit ConditionVariable(Mutex* user_lock); + + ~ConditionVariable(); + + // Wait() releases the caller's critical section atomically as it starts to + // sleep, and the reacquires it when it is signaled. + void Wait() const; + + // Like Wait(), but only waits up to a certain point in time. + // + // Returns true if we were Signal()'ed, or false if we reached 'until'. + bool WaitUntil(const MonoTime& until) const; + + // Like Wait(), but only waits up to a limited amount of time. + // + // Returns true if we were Signal()'ed, or false if 'delta' elapsed. + bool WaitFor(const MonoDelta& delta) const; + + // Broadcast() revives all waiting threads. + void Broadcast(); + // Signal() revives one waiting thread. + void Signal(); + + private: + + mutable pthread_cond_t condition_; + pthread_mutex_t* user_mutex_; + +#if !defined(NDEBUG) + Mutex* user_lock_; // Needed to adjust shadow lock state on wait. +#endif + + DISALLOW_COPY_AND_ASSIGN(ConditionVariable); +}; + +} // namespace kudu + +#endif // BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/countdown_latch-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/countdown_latch-test.cc b/be/src/kudu/util/countdown_latch-test.cc new file mode 100644 index 0000000..adb2623 --- /dev/null +++ b/be/src/kudu/util/countdown_latch-test.cc @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/bind.hpp> // IWYU pragma: keep +#include <gtest/gtest.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/monotime.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadpool.h" + +namespace kudu { + +static void DecrementLatch(CountDownLatch* latch, int amount) { + if (amount == 1) { + latch->CountDown(); + return; + } + latch->CountDown(amount); +} + +// Tests that we can decrement the latch by arbitrary amounts, as well +// as 1 by one. +TEST(TestCountDownLatch, TestLatch) { + + gscoped_ptr<ThreadPool> pool; + ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).Build(&pool)); + + CountDownLatch latch(1000); + + // Decrement the count by 1 in another thread, this should not fire the + // latch. + ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1))); + ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200))); + ASSERT_EQ(999, latch.count()); + + // Now decrement by 1000 this should decrement to 0 and fire the latch + // (even though 1000 is one more than the current count). + ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000))); + latch.Wait(); + ASSERT_EQ(0, latch.count()); +} + +// Test that resetting to zero while there are waiters lets the waiters +// continue. +TEST(TestCountDownLatch, TestResetToZero) { + CountDownLatch cdl(100); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "cdl-test", &CountDownLatch::Wait, &cdl, &t)); + + // Sleep for a bit until it's likely the other thread is waiting on the latch. + SleepFor(MonoDelta::FromMilliseconds(10)); + cdl.Reset(0); + t->Join(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/countdown_latch.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/countdown_latch.h b/be/src/kudu/util/countdown_latch.h new file mode 100644 index 0000000..9a8000d --- /dev/null +++ b/be/src/kudu/util/countdown_latch.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_COUNTDOWN_LATCH_H +#define KUDU_UTIL_COUNTDOWN_LATCH_H + +#include "kudu/gutil/macros.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +// This is a C++ implementation of the Java CountDownLatch +// class. +// See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html +class CountDownLatch { + public: + // Initialize the latch with the given initial count. + explicit CountDownLatch(int count) + : cond_(&lock_), + count_(count) { + } + + // Decrement the count of this latch by 'amount' + // If the new count is less than or equal to zero, then all waiting threads are woken up. + // If the count is already zero, this has no effect. + void CountDown(int amount) { + DCHECK_GE(amount, 0); + MutexLock lock(lock_); + if (count_ == 0) { + return; + } + + if (amount >= count_) { + count_ = 0; + } else { + count_ -= amount; + } + + if (count_ == 0) { + // Latch has triggered. + cond_.Broadcast(); + } + } + + // Decrement the count of this latch. + // If the new count is zero, then all waiting threads are woken up. + // If the count is already zero, this has no effect. + void CountDown() { + CountDown(1); + } + + // Wait until the count on the latch reaches zero. + // If the count is already zero, this returns immediately. + void Wait() const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + cond_.Wait(); + } + } + + // Waits for the count on the latch to reach zero, or until 'until' time is reached. + // Returns true if the count became zero, false otherwise. + bool WaitUntil(const MonoTime& when) const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + if (!cond_.WaitUntil(when)) { + return false; + } + } + return true; + } + + // Waits for the count on the latch to reach zero, or until 'delta' time elapses. + // Returns true if the count became zero, false otherwise. + bool WaitFor(const MonoDelta& delta) const { + return WaitUntil(MonoTime::Now() + delta); + } + + // Reset the latch with the given count. This is equivalent to reconstructing + // the latch. If 'count' is 0, and there are currently waiters, those waiters + // will be triggered as if you counted down to 0. + void Reset(uint64_t count) { + MutexLock lock(lock_); + count_ = count; + if (count_ == 0) { + // Awake any waiters if we reset to 0. + cond_.Broadcast(); + } + } + + uint64_t count() const { + MutexLock lock(lock_); + return count_; + } + + private: + DISALLOW_COPY_AND_ASSIGN(CountDownLatch); + mutable Mutex lock_; + ConditionVariable cond_; + + uint64_t count_; +}; + +// Utility class which calls latch->CountDown() in its destructor. +class CountDownOnScopeExit { + public: + explicit CountDownOnScopeExit(CountDownLatch *latch) : latch_(latch) {} + ~CountDownOnScopeExit() { + latch_->CountDown(); + } + + private: + DISALLOW_COPY_AND_ASSIGN(CountDownOnScopeExit); + + CountDownLatch *latch_; +}; + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/cow_object.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cow_object.cc b/be/src/kudu/util/cow_object.cc new file mode 100644 index 0000000..a22393c --- /dev/null +++ b/be/src/kudu/util/cow_object.cc @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/cow_object.h" + +using std::ostream; + +namespace kudu { + +ostream& operator<<(ostream& o, LockMode m) { + switch (m) { + case LockMode::READ: o << "READ"; break; + case LockMode::WRITE: o << "WRITE"; break; + case LockMode::RELEASED: o << "RELEASED"; break; + default: o << "UNKNOWN"; break; + } + return o; +} + +} // namespace kudu