http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/memory.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/memory.h b/be/src/kudu/util/memory/memory.h new file mode 100644 index 0000000..315631b --- /dev/null +++ b/be/src/kudu/util/memory/memory.h @@ -0,0 +1,970 @@ +// Copyright 2010 Google Inc. All Rights Reserved +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// +// Classes for memory management, used by materializations +// (arenas, segments, and STL collections parametrized via arena allocators) +// so that memory usage can be controlled at the application level. +// +// Materializations can be parametrized by specifying an instance of a +// BufferAllocator. The allocator implements +// memory management policy (e.g. setting allocation limits). Allocators may +// be shared between multiple materializations; e.g. you can designate a +// single allocator per a single user request, thus setting bounds on memory +// usage on a per-request basis. + +#ifndef KUDU_UTIL_MEMORY_MEMORY_H_ +#define KUDU_UTIL_MEMORY_MEMORY_H_ + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <limits> +#include <memory> +#include <ostream> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/util/boost_mutex_utils.h" +#include "kudu/util/memory/overwrite.h" +#include "kudu/util/mutex.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/singleton.h" + +namespace kudu { + +class BufferAllocator; +class MemTracker; + +// Wrapper for a block of data allocated by a BufferAllocator. Owns the block. +// (To release the block, destroy the buffer - it will then return it via the +// same allocator that has been used to create it). +class Buffer { + public: + ~Buffer(); + + void* data() const { return data_; } // The data buffer. + size_t size() const { return size_; } // In bytes. + + private: + friend class BufferAllocator; + + Buffer(void* data, size_t size, BufferAllocator* allocator) + : data_(CHECK_NOTNULL(data)), + size_(size), + allocator_(allocator) { +#ifndef NDEBUG + OverwriteWithPattern(reinterpret_cast<char*>(data_), size_, + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" + "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"); +#endif + } + + // Called by a successful realloc. + void Update(void* new_data, size_t new_size) { +#ifndef NDEBUG + if (new_size > size_) { + OverwriteWithPattern(reinterpret_cast<char*>(new_data) + size_, + new_size - size_, "NEW"); + } +#endif + data_ = new_data; + size_ = new_size; + } + + void* data_; + size_t size_; + BufferAllocator* const allocator_; + DISALLOW_COPY_AND_ASSIGN(Buffer); +}; + +// Allocators allow applications to control memory usage. They are +// used by materializations to allocate blocks of memory arenas. +// BufferAllocator is an abstract class that defines a common contract of +// all implementations of allocators. Specific allocators provide specific +// features, e.g. enforced resource limits, thread safety, etc. +class BufferAllocator { + public: + virtual ~BufferAllocator() {} + + // Called by the user when a new block of memory is needed. The 'requested' + // parameter specifies how much memory (in bytes) the user would like to get. + // The 'minimal' parameter specifies how much he is willing to settle for. + // The allocator returns a buffer sized in the range [minimal, requested], + // or NULL if the request can't be satisfied. When the buffer is destroyed, + // its destructor calls the FreeInternal() method on its allocator. + // CAVEAT: The allocator must outlive all buffers returned by it. + // + // Corner cases: + // 1. If requested == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer and zero capacity. + // 2. If minimal == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer, possibly with zero capacity. + Buffer* BestEffortAllocate(size_t requested, size_t minimal) { + DCHECK_LE(minimal, requested); + Buffer* result = AllocateInternal(requested, minimal, this); + LogAllocation(requested, minimal, result); + return result; + } + + // Called by the user when a new block of memory is needed. Equivalent to + // BestEffortAllocate(requested, requested). + Buffer* Allocate(size_t requested) { + return BestEffortAllocate(requested, requested); + } + + // Called by the user when a previously allocated block needs to be resized. + // Mimics semantics of <cstdlib> realloc. The 'requested' and 'minimal' + // represent the desired final buffer size, with semantics as in the Allocate. + // If the 'buffer' parameter is NULL, the call is equivalent to + // Allocate(requested, minimal). Otherwise, a reallocation of the buffer's + // data is attempted. On success, the original 'buffer' parameter is returned, + // but the buffer itself might have updated size and data. On failure, + // returns NULL, and leaves the input buffer unmodified. + // Reallocation might happen in-place, preserving the original data + // pointer, but it is not guaranteed - e.g. this function might degenerate to + // Allocate-Copy-Free. Either way, the content of the data buffer, up to the + // minimum of the new and old size, is preserved. + // + // Corner cases: + // 1. If requested == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer and zero capacity. + // 2. If minimal == 0, the allocator will always return a non-NULL Buffer + // with a non-NULL data pointer, possibly with zero capacity. + Buffer* BestEffortReallocate(size_t requested, + size_t minimal, + Buffer* buffer) { + DCHECK_LE(minimal, requested); + Buffer* result; + if (buffer == NULL) { + result = AllocateInternal(requested, minimal, this); + LogAllocation(requested, minimal, result); + return result; + } else { + result = ReallocateInternal(requested, minimal, buffer, this) ? + buffer : NULL; + LogAllocation(requested, minimal, buffer); + return result; + } + } + + // Called by the user when a previously allocated block needs to be resized. + // Equivalent to BestEffortReallocate(requested, requested, buffer). + Buffer* Reallocate(size_t requested, Buffer* buffer) { + return BestEffortReallocate(requested, requested, buffer); + } + + // Returns the amount of memory (in bytes) still available for this allocator. + // For unbounded allocators (like raw HeapBufferAllocator) this is the highest + // size_t value possible. + // TODO(user): consider making pure virtual. + virtual size_t Available() const { return std::numeric_limits<size_t>::max(); } + + protected: + friend class Buffer; + + BufferAllocator() {} + + // Expose the constructor to subclasses of BufferAllocator. + Buffer* CreateBuffer(void* data, + size_t size, + BufferAllocator* allocator) { + return new Buffer(data, size, allocator); + } + + // Expose Buffer::Update to subclasses of BufferAllocator. + void UpdateBuffer(void* new_data, size_t new_size, Buffer* buffer) { + buffer->Update(new_data, new_size); + } + + // Called by chained buffer allocators. + Buffer* DelegateAllocate(BufferAllocator* delegate, + size_t requested, + size_t minimal, + BufferAllocator* originator) { + return delegate->AllocateInternal(requested, minimal, originator); + } + + // Called by chained buffer allocators. + bool DelegateReallocate(BufferAllocator* delegate, + size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) { + return delegate->ReallocateInternal(requested, minimal, buffer, originator); + } + + // Called by chained buffer allocators. + void DelegateFree(BufferAllocator* delegate, Buffer* buffer) { + delegate->FreeInternal(buffer); + } + + private: + // Implemented by concrete subclasses. + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) = 0; + + // Implemented by concrete subclasses. Returns false on failure. + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) = 0; + + // Implemented by concrete subclasses. + virtual void FreeInternal(Buffer* buffer) = 0; + + // Logs a warning message if the allocation failed or if it returned less than + // the required number of bytes. + void LogAllocation(size_t required, size_t minimal, Buffer* buffer); + + DISALLOW_COPY_AND_ASSIGN(BufferAllocator); +}; + +// Allocates buffers on the heap, with no memory limits. Uses standard C +// allocation functions (malloc, realloc, free). +class HeapBufferAllocator : public BufferAllocator { + public: + virtual ~HeapBufferAllocator() {} + + // Returns a singleton instance of the heap allocator. + static HeapBufferAllocator* Get() { + return Singleton<HeapBufferAllocator>::get(); + } + + virtual size_t Available() const OVERRIDE { + return std::numeric_limits<size_t>::max(); + } + + private: + // Allocates memory that is aligned to 16 way. + // Use if you want to boost SIMD operations on the memory area. + const bool aligned_mode_; + + friend class Singleton<HeapBufferAllocator>; + + // Always allocates 'requested'-sized buffer, or returns NULL on OOM. + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + void* Malloc(size_t size); + void* Realloc(void* previousData, size_t previousSize, size_t newSize); + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + HeapBufferAllocator(); + explicit HeapBufferAllocator(bool aligned_mode) + : aligned_mode_(aligned_mode) {} + + DISALLOW_COPY_AND_ASSIGN(HeapBufferAllocator); +}; + +// Wrapper around the delegate allocator, that clears all newly allocated +// (and reallocated) memory. +class ClearingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + explicit ClearingBufferAllocator(BufferAllocator* delegate) + : delegate_(delegate) {} + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + DISALLOW_COPY_AND_ASSIGN(ClearingBufferAllocator); +}; + +// Abstract policy for modifying allocation requests - e.g. enforcing quotas. +class Mediator { + public: + Mediator() {} + virtual ~Mediator() {} + + // Called by an allocator when a allocation request is processed. + // Must return a value in the range [minimal, requested], or zero. Returning + // zero (if minimal is non-zero) indicates denial to allocate. Returning + // non-zero indicates that the request should be capped at that value. + virtual size_t Allocate(size_t requested, size_t minimal) = 0; + + // Called by an allocator when the specified amount (in bytes) is released. + virtual void Free(size_t amount) = 0; + + // TODO(user): consider making pure virtual. + virtual size_t Available() const { return std::numeric_limits<size_t>::max(); } +}; + +// Optionally thread-safe skeletal implementation of a 'quota' abstraction, +// providing methods to allocate resources against the quota, and return them. +template<bool thread_safe> +class Quota : public Mediator { + public: + explicit Quota(bool enforced) : usage_(0), enforced_(enforced) {} + virtual ~Quota() {} + + // Returns a value in range [minimal, requested] if not exceeding remaining + // quota or if the quota is not enforced (soft quota), and adjusts the usage + // value accordingly. Otherwise, returns zero. The semantics of 'remaining + // quota' are defined by subclasses (that must supply GetQuotaInternal() + // method). + virtual size_t Allocate(size_t requested, size_t minimal) OVERRIDE; + + virtual void Free(size_t amount) OVERRIDE; + + // Returns memory still available in the quota. For unenforced Quota objects, + // you are still able to perform _minimal_ allocations when the available + // quota is 0 (or less than "minimal" param). + virtual size_t Available() const OVERRIDE { + lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); + const size_t quota = GetQuotaInternal(); + return (usage_ >= quota) ? 0 : (quota - usage_); + } + + // Returns the current quota value. + size_t GetQuota() const; + + // Returns the current usage value, defined as a sum of all the values + // granted by calls to Allocate, less these released via calls to Free. + size_t GetUsage() const; + + bool enforced() const { + return enforced_; + } + + protected: + // Overridden by specific implementations, to define semantics of + // the quota, i.e. the total amount of resources that the mediator will + // allocate. Called directly from GetQuota that optionally provides + // thread safety. An 'Allocate' request will succeed if + // GetUsage() + minimal <= GetQuota() or if the quota is not enforced (soft + // quota). + virtual size_t GetQuotaInternal() const = 0; + + Mutex* mutex() const { return thread_safe ? &mutex_ : NULL; } + + private: + mutable Mutex mutex_; + size_t usage_; + bool enforced_; + DISALLOW_COPY_AND_ASSIGN(Quota); +}; + +// Optionally thread-safe static quota implementation (where quota is explicitly +// set to a concrete numeric value). +template<bool thread_safe> +class StaticQuota : public Quota<thread_safe> { + public: + explicit StaticQuota(size_t quota) + : Quota<thread_safe>(true) { + SetQuota(quota); + } + StaticQuota(size_t quota, bool enforced) + : Quota<thread_safe>(enforced) { + SetQuota(quota); + } + virtual ~StaticQuota() {} + + // Sets quota to the new value. + void SetQuota(const size_t quota); + + protected: + virtual size_t GetQuotaInternal() const { return quota_; } + + private: + size_t quota_; + DISALLOW_COPY_AND_ASSIGN(StaticQuota); +}; + +// Places resource limits on another allocator, using the specified Mediator +// (e.g. quota) implementation. +// +// If the mediator and the delegate allocator are thread-safe, this allocator +// is also thread-safe, to the extent that it will not introduce any +// state inconsistencies. However, without additional synchronization, +// allocation requests are not atomic end-to-end. This way, it is deadlock- +// resilient (even if you have cyclic relationships between allocators) and +// allows better concurrency. But, it may cause over-conservative +// allocations under memory contention, if you have multiple levels of +// mediating allocators. For example, if two requests that can't both be +// satisfied are submitted concurrently, it may happen that one of them succeeds +// but gets smaller buffer allocated than it would if the requests were strictly +// ordered. This is usually not a problem, however, as you don't really want to +// operate so close to memory limits that some of your allocations can't be +// satisfied. If you do have a simple, cascading graph of allocators though, +// and want to force requests be atomic end-to-end, put a +// ThreadSafeBufferAllocator at the entry point. +class MediatingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate, nor the mediator, allowing + // both to be reused. + MediatingBufferAllocator(BufferAllocator* const delegate, + Mediator* const mediator) + : delegate_(delegate), + mediator_(mediator) {} + + virtual ~MediatingBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + return std::min(delegate_->Available(), mediator_->Available()); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + Mediator* const mediator_; +}; + +// Convenience non-thread-safe static memory bounds enforcer. +// Combines MediatingBufferAllocator with a StaticQuota. +class MemoryLimit : public BufferAllocator { + public: + // Creates a limiter based on the default, heap allocator. Quota is infinite. + // (Can be set using SetQuota). + MemoryLimit() + : quota_(std::numeric_limits<size_t>::max()), + allocator_(HeapBufferAllocator::Get(), "a_) {} + + // Creates a limiter based on the default, heap allocator. + explicit MemoryLimit(size_t quota) + : quota_(quota), + allocator_(HeapBufferAllocator::Get(), "a_) {} + + // Creates a limiter relaying to the specified delegate allocator. + MemoryLimit(size_t quota, BufferAllocator* const delegate) + : quota_(quota), + allocator_(delegate, "a_) {} + + // Creates a (possibly non-enforcing) limiter relaying to the specified + // delegate allocator. + MemoryLimit(size_t quota, bool enforced, BufferAllocator* const delegate) + : quota_(quota, enforced), + allocator_(delegate, "a_) {} + + virtual ~MemoryLimit() {} + + virtual size_t Available() const OVERRIDE { + return allocator_.Available(); + } + + size_t GetQuota() const { return quota_.GetQuota(); } + size_t GetUsage() const { return quota_.GetUsage(); } + void SetQuota(const size_t quota) { quota_.SetQuota(quota); } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + return DelegateAllocate(&allocator_, requested, minimal, originator); + } + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + return DelegateReallocate(&allocator_, requested, minimal, buffer, + originator); + } + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&allocator_, buffer); + } + + StaticQuota<false> quota_; + MediatingBufferAllocator allocator_; +}; + +// An allocator that allows to bypass the (potential) soft quota below for a +// given amount of memory usage. The goal is to make the allocation methods and +// Available() work as if the allocator below had at least bypassed_amount of +// soft quota. Of course this class doesn't allow to exceed the hard quota. +class SoftQuotaBypassingBufferAllocator : public BufferAllocator { + public: + SoftQuotaBypassingBufferAllocator(BufferAllocator* allocator, + size_t bypassed_amount) + : allocator_(std::numeric_limits<size_t>::max(), allocator), + bypassed_amount_(bypassed_amount) {} + + virtual size_t Available() const OVERRIDE { + const size_t usage = allocator_.GetUsage(); + size_t available = allocator_.Available(); + if (bypassed_amount_ > usage) { + available = std::max(bypassed_amount_ - usage, available); + } + return available; + } + + private: + // Calculates how much to increase the minimal parameter to allocate more + // aggressively in the underlying allocator. This is to avoid getting only + // very small allocations when we exceed the soft quota below. The request + // with increased minimal size is more likely to fail because of exceeding + // hard quota, so we also fall back to the original minimal size. + size_t AdjustMinimal(size_t requested, size_t minimal) const { + return std::min(requested, std::max(minimal, Available())); + } + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + // Try increasing the "minimal" parameter to allocate more aggresively + // within the bypassed amount of soft quota. + Buffer* result = DelegateAllocate(&allocator_, + requested, + AdjustMinimal(requested, minimal), + originator); + if (result != NULL) { + return result; + } else { + return DelegateAllocate(&allocator_, + requested, + minimal, + originator); + } + } + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + if (DelegateReallocate(&allocator_, + requested, + AdjustMinimal(requested, minimal), + buffer, + originator)) { + return true; + } else { + return DelegateReallocate(&allocator_, + requested, + minimal, + buffer, + originator); + } + } + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&allocator_, buffer); + } + + // Using MemoryLimit with "infinite" limit to get GetUsage(). + MemoryLimit allocator_; + size_t bypassed_amount_; +}; + +// An interface for a MemoryStatisticsCollector - an object which collects +// information about the memory usage of the allocator. The collector will +// gather statistics about memory usage based on information received from the +// allocator. +class MemoryStatisticsCollectorInterface { + public: + MemoryStatisticsCollectorInterface() {} + + virtual ~MemoryStatisticsCollectorInterface() {} + + // Informs the collector that the allocator granted bytes memory. Note that in + // the case of reallocation bytes should be the increase in total memory + // usage, not the total size of the buffer after reallocation. + virtual void AllocatedMemoryBytes(size_t bytes) = 0; + + // Informs the collector that the allocator received a request for at least + // bytes memory, and rejected it (meaning that it granted nothing). + virtual void RefusedMemoryBytes(size_t bytes) = 0; + + // Informs the collector that bytes memory have been released to the + // allocator. + virtual void FreedMemoryBytes(size_t bytes) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(MemoryStatisticsCollectorInterface); +}; + +class MemoryStatisticsCollectingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + // Takes ownership of memory_stats_collector. + MemoryStatisticsCollectingBufferAllocator( + BufferAllocator* const delegate, + MemoryStatisticsCollectorInterface* const memory_stats_collector) + : delegate_(delegate), + memory_stats_collector_(memory_stats_collector) {} + + virtual ~MemoryStatisticsCollectingBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + gscoped_ptr<MemoryStatisticsCollectorInterface> + memory_stats_collector_; +}; + +// BufferAllocator which uses MemTracker to keep track of and optionally +// (if a limit is set on the MemTracker) regulate memory consumption. +class MemoryTrackingBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. The delegate must remain + // valid for the lifetime of this allocator. Increments reference + // count for 'mem_tracker'. + // If 'mem_tracker' has a limit and 'enforce_limit' is true, then + // the classes calling this buffer allocator (whether directly, or + // through an Arena) must be able to handle the case when allocation + // fails. If 'enforce_limit' is false (this is the default), then + // allocation will always succeed. + MemoryTrackingBufferAllocator(BufferAllocator* const delegate, + std::shared_ptr<MemTracker> mem_tracker, + bool enforce_limit = false) + : delegate_(delegate), + mem_tracker_(std::move(mem_tracker)), + enforce_limit_(enforce_limit) {} + + virtual ~MemoryTrackingBufferAllocator() {} + + // If enforce limit is false, this always returns maximum possible value + // for int64_t (std::numeric_limits<int64_t>::max()). Otherwise, this + // is equivalent to calling mem_tracker_->SpareCapacity(); + virtual size_t Available() const OVERRIDE; + + private: + + // If enforce_limit_ is true, this is equivalent to calling + // mem_tracker_->TryConsume(bytes). If enforce_limit_ is false and + // mem_tracker_->TryConsume(bytes) is false, we call + // mem_tracker_->Consume(bytes) and always return true. + bool TryConsume(int64_t bytes); + + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE; + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE; + + virtual void FreeInternal(Buffer* buffer) OVERRIDE; + + BufferAllocator* delegate_; + std::shared_ptr<MemTracker> mem_tracker_; + bool enforce_limit_; +}; + +// Synchronizes access to AllocateInternal and FreeInternal, and exposes the +// mutex for use by subclasses. Allocation requests performed through this +// allocator are atomic end-to-end. Template parameter DelegateAllocatorType +// allows to specify a subclass of BufferAllocator for the delegate, to allow +// subclasses of ThreadSafeBufferAllocator to access additional methods provided +// by the allocator subclass. If this is not needed, it can be set to +// BufferAllocator. +template <class DelegateAllocatorType> +class ThreadSafeBufferAllocator : public BufferAllocator { + public: + // Does not take ownership of the delegate. + explicit ThreadSafeBufferAllocator(DelegateAllocatorType* delegate) + : delegate_(delegate) {} + virtual ~ThreadSafeBufferAllocator() {} + + virtual size_t Available() const OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->Available(); + } + + protected: + Mutex* mutex() const { return &mutex_; } + // Expose the delegate allocator, with the precise type of the allocator + // specified by the template parameter. The delegate() methods themselves + // don't give any thread-safety guarantees. Protect all uses taking the Mutex + // exposed by the mutex() method. + DelegateAllocatorType* delegate() { return delegate_; } + const DelegateAllocatorType* delegate() const { return delegate_; } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return DelegateAllocate(delegate(), requested, minimal, originator); + } + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + return DelegateReallocate(delegate(), requested, minimal, buffer, + originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + lock_guard_maybe<Mutex> lock(mutex()); + DelegateFree(delegate(), buffer); + } + + DelegateAllocatorType* delegate_; + mutable Mutex mutex_; + DISALLOW_COPY_AND_ASSIGN(ThreadSafeBufferAllocator); +}; + +// A version of ThreadSafeBufferAllocator that owns the supplied delegate +// allocator. +template <class DelegateAllocatorType> +class OwningThreadSafeBufferAllocator + : public ThreadSafeBufferAllocator<DelegateAllocatorType> { + public: + explicit OwningThreadSafeBufferAllocator(DelegateAllocatorType* delegate) + : ThreadSafeBufferAllocator<DelegateAllocatorType>(delegate), + delegate_owned_(delegate) {} + virtual ~OwningThreadSafeBufferAllocator() {} + + private: + gscoped_ptr<DelegateAllocatorType> delegate_owned_; +}; + +class ThreadSafeMemoryLimit + : public OwningThreadSafeBufferAllocator<MemoryLimit> { + public: + ThreadSafeMemoryLimit(size_t quota, bool enforced, + BufferAllocator* const delegate) + : OwningThreadSafeBufferAllocator<MemoryLimit>( + new MemoryLimit(quota, enforced, delegate)) {} + virtual ~ThreadSafeMemoryLimit() {} + + size_t GetQuota() const { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->GetQuota(); + } + size_t GetUsage() const { + lock_guard_maybe<Mutex> lock(mutex()); + return delegate()->GetUsage(); + } + void SetQuota(const size_t quota) { + lock_guard_maybe<Mutex> lock(mutex()); + delegate()->SetQuota(quota); + } +}; + +// A BufferAllocator that can be given ownership of many objects of given type. +// These objects will then be deleted when the buffer allocator is destroyed. +// The objects added last are deleted first (LIFO). +template <typename OwnedType> +class OwningBufferAllocator : public BufferAllocator { + public: + // Doesn't take ownership of delegate. + explicit OwningBufferAllocator(BufferAllocator* const delegate) + : delegate_(delegate) {} + + virtual ~OwningBufferAllocator() { + // Delete elements starting from the end. + while (!owned_.empty()) { + OwnedType* p = owned_.back(); + owned_.pop_back(); + delete p; + } + } + + // Add to the collection of objects owned by this allocator. The object added + // last is deleted first. + OwningBufferAllocator* Add(OwnedType* p) { + owned_.push_back(p); + return this; + } + + virtual size_t Available() const OVERRIDE { + return delegate_->Available(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + return DelegateAllocate(delegate_, requested, minimal, originator); + } + + virtual bool ReallocateInternal(size_t requested, + size_t minimal, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + return DelegateReallocate(delegate_, requested, minimal, buffer, + originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(delegate_, buffer); + } + + // Not using PointerVector here because we want to guarantee certain order of + // deleting elements (starting from the ones added last). + std::vector<OwnedType*> owned_; + BufferAllocator* delegate_; +}; + +// Buffer allocator that tries to guarantee the exact and consistent amount +// of memory. Uses hard MemoryLimit to enforce the upper bound but also +// guarantees consistent allocations by ignoring minimal requested amounts and +// always returning the full amount of memory requested if available. +// Allocations will fail if the memory requested would exceed the quota or if +// the underlying allocator fails to provide the memory. +class GuaranteeMemory : public BufferAllocator { + public: + // Doesn't take ownership of 'delegate'. + GuaranteeMemory(size_t memory_quota, + BufferAllocator* delegate) + : limit_(memory_quota, true, delegate), + memory_guarantee_(memory_quota) {} + + virtual size_t Available() const OVERRIDE { + return memory_guarantee_ - limit_.GetUsage(); + } + + private: + virtual Buffer* AllocateInternal(size_t requested, + size_t minimal, + BufferAllocator* originator) OVERRIDE { + if (requested > Available()) { + return NULL; + } else { + return DelegateAllocate(&limit_, requested, requested, originator); + } + } + + virtual bool ReallocateInternal(size_t requested, + size_t /* minimal */, + Buffer* buffer, + BufferAllocator* originator) OVERRIDE { + int64_t additional_memory = requested - (buffer != NULL ? buffer->size() : 0); + return additional_memory <= static_cast<int64_t>(Available()) + && DelegateReallocate(&limit_, requested, requested, + buffer, originator); + } + + virtual void FreeInternal(Buffer* buffer) OVERRIDE { + DelegateFree(&limit_, buffer); + } + + MemoryLimit limit_; + size_t memory_guarantee_; + DISALLOW_COPY_AND_ASSIGN(GuaranteeMemory); +}; + +// Implementation of inline and template methods + +template<bool thread_safe> +size_t Quota<thread_safe>::Allocate(const size_t requested, + const size_t minimal) { + lock_guard_maybe<Mutex> lock(mutex()); + DCHECK_LE(minimal, requested) + << "\"minimal\" shouldn't be bigger than \"requested\""; + const size_t quota = GetQuotaInternal(); + size_t allocation; + if (usage_ > quota || minimal > quota - usage_) { + // OOQ (Out of quota). + if (!enforced() && minimal <= std::numeric_limits<size_t>::max() - usage_) { + // The quota is unenforced and the value of "minimal" won't cause an + // overflow. Perform a minimal allocation. + allocation = minimal; + } else { + allocation = 0; + } + LOG(WARNING) << "Out of quota. Requested: " << requested + << " bytes, or at least minimal: " << minimal + << ". Current quota value is: " << quota + << " while current usage is: " << usage_ + << ". The quota is " << (enforced() ? "" : "not ") + << "enforced. " + << ((allocation == 0) ? "Did not allocate any memory." + : "Allocated the minimal value requested."); + } else { + allocation = std::min(requested, quota - usage_); + } + usage_ += allocation; + return allocation; +} + +template<bool thread_safe> +void Quota<thread_safe>::Free(size_t amount) { + lock_guard_maybe<Mutex> lock(mutex()); + usage_ -= amount; + // threads allocate/free memory concurrently via the same Quota object that is + // not protected with a mutex (thread_safe == false). + if (usage_ > (std::numeric_limits<size_t>::max() - (1 << 28))) { + LOG(ERROR) << "Suspiciously big usage_ value: " << usage_ + << " (could be a result size_t wrapping around below 0, " + << "for example as a result of race condition)."; + } +} + +template<bool thread_safe> +size_t Quota<thread_safe>::GetQuota() const { + lock_guard_maybe<Mutex> lock(mutex()); + return GetQuotaInternal(); +} + +template<bool thread_safe> +size_t Quota<thread_safe>::GetUsage() const { + lock_guard_maybe<Mutex> lock(mutex()); + return usage_; +} + +template<bool thread_safe> +void StaticQuota<thread_safe>::SetQuota(const size_t quota) { + lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); + quota_ = quota; +} + +} // namespace kudu + +#endif // KUDU_UTIL_MEMORY_MEMORY_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/overwrite.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/overwrite.cc b/be/src/kudu/util/memory/overwrite.cc new file mode 100644 index 0000000..cca9227 --- /dev/null +++ b/be/src/kudu/util/memory/overwrite.cc @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/memory/overwrite.h" + +#include "kudu/gutil/strings/stringpiece.h" + +#include <string.h> +#include <glog/logging.h> +namespace kudu { + +void OverwriteWithPattern(char* p, size_t len, StringPiece pattern) { + size_t pat_len = pattern.size(); + CHECK_LT(0, pat_len); + size_t rem = len; + const char *pat_ptr = pattern.data(); + + for (; rem >= pat_len; rem -= pat_len) { + memcpy(p, pat_ptr, pat_len); + p += pat_len; + } + + for (; rem > 0; rem--) { + *p++ = *pat_ptr++; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/overwrite.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memory/overwrite.h b/be/src/kudu/util/memory/overwrite.h new file mode 100644 index 0000000..37c6512 --- /dev/null +++ b/be/src/kudu/util/memory/overwrite.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_MEMORY_OVERWRITE_H +#define KUDU_MEMORY_OVERWRITE_H + +#include <cstddef> + +#include "kudu/gutil/strings/stringpiece.h" + +namespace kudu { + +// Overwrite 'p' with enough repetitions of 'pattern' to fill 'len' +// bytes. This is optimized at -O3 even in debug builds, so is +// reasonably efficient to use. +void OverwriteWithPattern(char* p, size_t len, StringPiece pattern); + +} // namespace kudu +#endif /* KUDU_MEMORY_OVERWRITE_H */ + http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/metrics-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/metrics-test.cc b/be/src/kudu/util/metrics-test.cc new file mode 100644 index 0000000..f8776aa --- /dev/null +++ b/be/src/kudu/util/metrics-test.cc @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <ostream> +#include <string> +#include <unordered_set> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <rapidjson/document.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/bind_helpers.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/hdr_histogram.h" +#include "kudu/util/jsonreader.h" +#include "kudu/util/jsonwriter.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::unordered_set; +using std::vector; + +DECLARE_int32(metrics_retirement_age_ms); + +namespace kudu { + +METRIC_DEFINE_entity(test_entity); + +class MetricsTest : public KuduTest { + public: + void SetUp() override { + KuduTest::SetUp(); + + entity_ = METRIC_ENTITY_test_entity.Instantiate(®istry_, "my-test"); + } + + protected: + MetricRegistry registry_; + scoped_refptr<MetricEntity> entity_; +}; + +METRIC_DEFINE_counter(test_entity, test_counter, "My Test Counter", MetricUnit::kRequests, + "Description of test counter"); + +TEST_F(MetricsTest, SimpleCounterTest) { + scoped_refptr<Counter> requests = + new Counter(&METRIC_test_counter); + ASSERT_EQ("Description of test counter", requests->prototype()->description()); + ASSERT_EQ(0, requests->value()); + requests->Increment(); + ASSERT_EQ(1, requests->value()); + requests->IncrementBy(2); + ASSERT_EQ(3, requests->value()); +} + +METRIC_DEFINE_gauge_uint64(test_entity, test_gauge, "Test uint64 Gauge", + MetricUnit::kBytes, "Description of Test Gauge"); + +TEST_F(MetricsTest, SimpleAtomicGaugeTest) { + scoped_refptr<AtomicGauge<uint64_t> > mem_usage = + METRIC_test_gauge.Instantiate(entity_, 0); + ASSERT_EQ(METRIC_test_gauge.description(), mem_usage->prototype()->description()); + ASSERT_EQ(0, mem_usage->value()); + mem_usage->IncrementBy(7); + ASSERT_EQ(7, mem_usage->value()); + mem_usage->set_value(5); + ASSERT_EQ(5, mem_usage->value()); +} + +METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Function Gauge", + MetricUnit::kBytes, "Test Gauge 2"); + +static int64_t MyFunction(int* metric_val) { + return (*metric_val)++; +} + +TEST_F(MetricsTest, SimpleFunctionGaugeTest) { + int metric_val = 1000; + scoped_refptr<FunctionGauge<int64_t> > gauge = + METRIC_test_func_gauge.InstantiateFunctionGauge( + entity_, Bind(&MyFunction, Unretained(&metric_val))); + + ASSERT_EQ(1000, gauge->value()); + ASSERT_EQ(1001, gauge->value()); + + gauge->DetachToCurrentValue(); + // After detaching, it should continue to return the same constant value. + ASSERT_EQ(1002, gauge->value()); + ASSERT_EQ(1002, gauge->value()); + + // Test resetting to a constant. + gauge->DetachToConstant(2); + ASSERT_EQ(2, gauge->value()); +} + +TEST_F(MetricsTest, AutoDetachToLastValue) { + int metric_val = 1000; + scoped_refptr<FunctionGauge<int64_t> > gauge = + METRIC_test_func_gauge.InstantiateFunctionGauge( + entity_, Bind(&MyFunction, Unretained(&metric_val))); + + ASSERT_EQ(1000, gauge->value()); + ASSERT_EQ(1001, gauge->value()); + { + FunctionGaugeDetacher detacher; + gauge->AutoDetachToLastValue(&detacher); + ASSERT_EQ(1002, gauge->value()); + ASSERT_EQ(1003, gauge->value()); + } + + ASSERT_EQ(1004, gauge->value()); + ASSERT_EQ(1004, gauge->value()); +} + +TEST_F(MetricsTest, AutoDetachToConstant) { + int metric_val = 1000; + scoped_refptr<FunctionGauge<int64_t> > gauge = + METRIC_test_func_gauge.InstantiateFunctionGauge( + entity_, Bind(&MyFunction, Unretained(&metric_val))); + + ASSERT_EQ(1000, gauge->value()); + ASSERT_EQ(1001, gauge->value()); + { + FunctionGaugeDetacher detacher; + gauge->AutoDetach(&detacher, 12345); + ASSERT_EQ(1002, gauge->value()); + ASSERT_EQ(1003, gauge->value()); + } + + ASSERT_EQ(12345, gauge->value()); +} + +METRIC_DEFINE_gauge_uint64(test_entity, counter_as_gauge, "Gauge exposed as Counter", + MetricUnit::kBytes, "Gauge exposed as Counter", + EXPOSE_AS_COUNTER); +TEST_F(MetricsTest, TEstExposeGaugeAsCounter) { + ASSERT_EQ(MetricType::kCounter, METRIC_counter_as_gauge.type()); +} + +METRIC_DEFINE_histogram(test_entity, test_hist, "Test Histogram", + MetricUnit::kMilliseconds, "foo", 1000000, 3); + +TEST_F(MetricsTest, SimpleHistogramTest) { + scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_); + hist->Increment(2); + hist->IncrementBy(4, 1); + ASSERT_EQ(2, hist->histogram_->MinValue()); + ASSERT_EQ(3, hist->histogram_->MeanValue()); + ASSERT_EQ(4, hist->histogram_->MaxValue()); + ASSERT_EQ(2, hist->histogram_->TotalCount()); + ASSERT_EQ(6, hist->histogram_->TotalSum()); + // TODO: Test coverage needs to be improved a lot. +} + +TEST_F(MetricsTest, JsonPrintTest) { + scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_); + test_counter->Increment(); + entity_->SetAttribute("test_attr", "attr_val"); + + // Generate the JSON. + std::ostringstream out; + JsonWriter writer(&out, JsonWriter::PRETTY); + ASSERT_OK(entity_->WriteAsJson(&writer, { "*" }, MetricJsonOptions())); + + // Now parse it back out. + JsonReader reader(out.str()); + ASSERT_OK(reader.Init()); + + vector<const rapidjson::Value*> metrics; + ASSERT_OK(reader.ExtractObjectArray(reader.root(), "metrics", &metrics)); + ASSERT_EQ(1, metrics.size()); + string metric_name; + ASSERT_OK(reader.ExtractString(metrics[0], "name", &metric_name)); + ASSERT_EQ("test_counter", metric_name); + int64_t metric_value; + ASSERT_OK(reader.ExtractInt64(metrics[0], "value", &metric_value)); + ASSERT_EQ(1L, metric_value); + + const rapidjson::Value* attributes; + ASSERT_OK(reader.ExtractObject(reader.root(), "attributes", &attributes)); + string attr_value; + ASSERT_OK(reader.ExtractString(attributes, "test_attr", &attr_value)); + ASSERT_EQ("attr_val", attr_value); + + // Verify that metric filtering matches on substrings. + out.str(""); + ASSERT_OK(entity_->WriteAsJson(&writer, { "test count" }, MetricJsonOptions())); + ASSERT_STR_CONTAINS(METRIC_test_counter.name(), out.str()); + + // Verify that, if we filter for a metric that isn't in this entity, we get no result. + out.str(""); + ASSERT_OK(entity_->WriteAsJson(&writer, { "not_a_matching_metric" }, MetricJsonOptions())); + ASSERT_EQ("", out.str()); + + // Verify that filtering is case-insensitive. + out.str(""); + ASSERT_OK(entity_->WriteAsJson(&writer, { "mY teST coUNteR" }, MetricJsonOptions())); + ASSERT_STR_CONTAINS(METRIC_test_counter.name(), out.str()); +} + +// Test that metrics are retired when they are no longer referenced. +TEST_F(MetricsTest, RetirementTest) { + FLAGS_metrics_retirement_age_ms = 100; + + const string kMetricName = "foo"; + scoped_refptr<Counter> counter = METRIC_test_counter.Instantiate(entity_); + ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size()); + + // Since we hold a reference to the counter, it should not get retired. + entity_->RetireOldMetrics(); + ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size()); + + // When we de-ref it, it should not get immediately retired, either, because + // we keep retirable metrics around for some amount of time. We try retiring + // a number of times to hit all the cases. + counter = nullptr; + for (int i = 0; i < 3; i++) { + entity_->RetireOldMetrics(); + ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size()); + } + + // If we wait for longer than the retirement time, and call retire again, we'll + // actually retire it. + SleepFor(MonoDelta::FromMilliseconds(FLAGS_metrics_retirement_age_ms * 1.5)); + entity_->RetireOldMetrics(); + ASSERT_EQ(0, entity_->UnsafeMetricsMapForTests().size()); +} + +TEST_F(MetricsTest, TestRetiringEntities) { + ASSERT_EQ(1, registry_.num_entities()); + + // Drop the reference to our entity. + entity_.reset(); + + // Retire metrics. Since there is nothing inside our entity, it should + // retire immediately (no need to loop). + registry_.RetireOldMetrics(); + + ASSERT_EQ(0, registry_.num_entities()); +} + +// Test that we can mark a metric to never be retired. +TEST_F(MetricsTest, NeverRetireTest) { + entity_->NeverRetire(METRIC_test_hist.Instantiate(entity_)); + FLAGS_metrics_retirement_age_ms = 0; + + for (int i = 0; i < 3; i++) { + entity_->RetireOldMetrics(); + ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size()); + } +} + +TEST_F(MetricsTest, TestInstantiatingTwice) { + // Test that re-instantiating the same entity ID returns the same object. + scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_test_entity.Instantiate( + ®istry_, entity_->id()); + ASSERT_EQ(new_entity.get(), entity_.get()); +} + +TEST_F(MetricsTest, TestInstantiatingDifferentEntities) { + scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_test_entity.Instantiate( + ®istry_, "some other ID"); + ASSERT_NE(new_entity.get(), entity_.get()); +} + +TEST_F(MetricsTest, TestDumpJsonPrototypes) { + // Dump the prototype info. + std::ostringstream out; + JsonWriter w(&out, JsonWriter::PRETTY); + MetricPrototypeRegistry::get()->WriteAsJson(&w); + string json = out.str(); + + // Quick sanity check for one of our metrics defined in this file. + const char* expected = + " {\n" + " \"name\": \"test_func_gauge\",\n" + " \"label\": \"Test Function Gauge\",\n" + " \"type\": \"gauge\",\n" + " \"unit\": \"bytes\",\n" + " \"description\": \"Test Gauge 2\",\n" + " \"entity_type\": \"test_entity\"\n" + " }"; + ASSERT_STR_CONTAINS(json, expected); + + // Parse it. + rapidjson::Document d; + d.Parse<0>(json.c_str()); + + // Ensure that we got a reasonable number of metrics. + int num_metrics = d["metrics"].Size(); + int num_entities = d["entities"].Size(); + LOG(INFO) << "Parsed " << num_metrics << " metrics and " << num_entities << " entities"; + ASSERT_GT(num_metrics, 5); + ASSERT_EQ(num_entities, 2); + + // Spot-check that some metrics were properly registered and that the JSON was properly + // formed. + unordered_set<string> seen_metrics; + for (int i = 0; i < d["metrics"].Size(); i++) { + InsertOrDie(&seen_metrics, d["metrics"][i]["name"].GetString()); + } + ASSERT_TRUE(ContainsKey(seen_metrics, "threads_started")); + ASSERT_TRUE(ContainsKey(seen_metrics, "test_hist")); +} + +TEST_F(MetricsTest, TestDumpOnlyChanged) { + auto GetJson = [&](int64_t since_epoch) { + MetricJsonOptions opts; + opts.only_modified_in_or_after_epoch = since_epoch; + std::ostringstream out; + JsonWriter writer(&out, JsonWriter::COMPACT); + CHECK_OK(entity_->WriteAsJson(&writer, { "*" }, opts)); + return out.str(); + }; + + scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_); + + int64_t epoch_when_modified = Metric::current_epoch(); + test_counter->Increment(); + + // If we pass a "since dirty" epoch from before we incremented it, we should + // see the metric. + for (int i = 0; i < 2; i++) { + ASSERT_STR_CONTAINS(GetJson(epoch_when_modified), "{\"name\":\"test_counter\",\"value\":1}"); + Metric::IncrementEpoch(); + } + + // If we pass a current epoch, we should see that the metric was not modified. + int64_t new_epoch = Metric::current_epoch(); + ASSERT_STR_NOT_CONTAINS(GetJson(new_epoch), "test_counter"); + // ... until we modify it again. + test_counter->Increment(); + ASSERT_STR_CONTAINS(GetJson(new_epoch), "{\"name\":\"test_counter\",\"value\":2}"); +} + + +// Test that 'include_untouched_metrics=false' prevents dumping counters and histograms +// which have never been incremented. +TEST_F(MetricsTest, TestDontDumpUntouched) { + // Instantiate a bunch of metrics. + int metric_val = 1000; + scoped_refptr<Counter> test_counter = METRIC_test_counter.Instantiate(entity_); + scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_); + scoped_refptr<FunctionGauge<int64_t> > function_gauge = + METRIC_test_func_gauge.InstantiateFunctionGauge( + entity_, Bind(&MyFunction, Unretained(&metric_val))); + scoped_refptr<AtomicGauge<uint64_t> > atomic_gauge = + METRIC_test_gauge.Instantiate(entity_, 0); + + MetricJsonOptions opts; + opts.include_untouched_metrics = false; + std::ostringstream out; + JsonWriter writer(&out, JsonWriter::COMPACT); + CHECK_OK(entity_->WriteAsJson(&writer, { "*" }, opts)); + // Untouched counters and histograms should not be included. + ASSERT_STR_NOT_CONTAINS(out.str(), "test_counter"); + ASSERT_STR_NOT_CONTAINS(out.str(), "test_hist"); + // Untouched gauges need to be included, because we don't actually + // track whether they have been touched. + ASSERT_STR_CONTAINS(out.str(), "test_func_gauge"); + ASSERT_STR_CONTAINS(out.str(), "test_gauge"); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/metrics.cc b/be/src/kudu/util/metrics.cc new file mode 100644 index 0000000..dc30360 --- /dev/null +++ b/be/src/kudu/util/metrics.cc @@ -0,0 +1,746 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/metrics.h" + +#include <iostream> +#include <map> +#include <utility> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/singleton.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/hdr_histogram.h" +#include "kudu/util/histogram.pb.h" +#include "kudu/util/status.h" +#include "kudu/util/string_case.h" + +DEFINE_int32(metrics_retirement_age_ms, 120 * 1000, + "The minimum number of milliseconds a metric will be kept for after it is " + "no longer active. (Advanced option)"); +TAG_FLAG(metrics_retirement_age_ms, runtime); +TAG_FLAG(metrics_retirement_age_ms, advanced); + +// Process/server-wide metrics should go into the 'server' entity. +// More complex applications will define other entities. +METRIC_DEFINE_entity(server); + +namespace kudu { + +using std::string; +using std::vector; +using strings::Substitute; + +// +// MetricUnit +// + +const char* MetricUnit::Name(Type unit) { + switch (unit) { + case kCacheHits: + return "hits"; + case kCacheQueries: + return "queries"; + case kBytes: + return "bytes"; + case kRequests: + return "requests"; + case kEntries: + return "entries"; + case kRows: + return "rows"; + case kCells: + return "cells"; + case kConnections: + return "connections"; + case kOperations: + return "operations"; + case kProbes: + return "probes"; + case kNanoseconds: + return "nanoseconds"; + case kMicroseconds: + return "microseconds"; + case kMilliseconds: + return "milliseconds"; + case kSeconds: + return "seconds"; + case kThreads: + return "threads"; + case kTransactions: + return "transactions"; + case kUnits: + return "units"; + case kScanners: + return "scanners"; + case kMaintenanceOperations: + return "operations"; + case kBlocks: + return "blocks"; + case kHoles: + return "holes"; + case kLogBlockContainers: + return "log block containers"; + case kTasks: + return "tasks"; + case kMessages: + return "messages"; + case kContextSwitches: + return "context switches"; + case kDataDirectories: + return "data directories"; + case kState: + return "state"; + case kSessions: + return "sessions"; + case kTablets: + return "tablets"; + default: + DCHECK(false) << "Unknown unit with type = " << unit; + return "UNKNOWN UNIT"; + } +} + +// +// MetricType +// + +const char* const MetricType::kGaugeType = "gauge"; +const char* const MetricType::kCounterType = "counter"; +const char* const MetricType::kHistogramType = "histogram"; +const char* MetricType::Name(MetricType::Type type) { + switch (type) { + case kGauge: + return kGaugeType; + case kCounter: + return kCounterType; + case kHistogram: + return kHistogramType; + default: + return "UNKNOWN TYPE"; + } +} + +// +// MetricEntityPrototype +// + +MetricEntityPrototype::MetricEntityPrototype(const char* name) + : name_(name) { + MetricPrototypeRegistry::get()->AddEntity(this); +} + +MetricEntityPrototype::~MetricEntityPrototype() { +} + +scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate( + MetricRegistry* registry, + const std::string& id, + const MetricEntity::AttributeMap& initial_attrs) const { + return registry->FindOrCreateEntity(this, id, initial_attrs); +} + + +// +// MetricEntity +// + +MetricEntity::MetricEntity(const MetricEntityPrototype* prototype, + std::string id, AttributeMap attributes) + : prototype_(prototype), + id_(std::move(id)), + attributes_(std::move(attributes)), + published_(true) {} + +MetricEntity::~MetricEntity() { +} + +void MetricEntity::CheckInstantiation(const MetricPrototype* proto) const { + CHECK_STREQ(prototype_->name(), proto->entity_type()) + << "Metric " << proto->name() << " may not be instantiated entity of type " + << prototype_->name() << " (expected: " << proto->entity_type() << ")"; +} + +scoped_refptr<Metric> MetricEntity::FindOrNull(const MetricPrototype& prototype) const { + std::lock_guard<simple_spinlock> l(lock_); + return FindPtrOrNull(metric_map_, &prototype); +} + +namespace { + +bool MatchMetricInList(const string& metric_name, + const vector<string>& match_params) { + string metric_name_uc; + ToUpperCase(metric_name, &metric_name_uc); + + for (const string& param : match_params) { + // Handle wildcard. + if (param == "*") return true; + // The parameter is a case-insensitive substring match of the metric name. + string param_uc; + ToUpperCase(param, ¶m_uc); + if (metric_name_uc.find(param_uc) != std::string::npos) { + return true; + } + } + return false; +} + +} // anonymous namespace + + +Status MetricEntity::WriteAsJson(JsonWriter* writer, + const vector<string>& requested_metrics, + const MetricJsonOptions& opts) const { + bool select_all = MatchMetricInList(id(), requested_metrics); + + // We want the keys to be in alphabetical order when printing, so we use an ordered map here. + typedef std::map<const char*, scoped_refptr<Metric> > OrderedMetricMap; + OrderedMetricMap metrics; + AttributeMap attrs; + { + // Snapshot the metrics in this registry (not guaranteed to be a consistent snapshot) + std::lock_guard<simple_spinlock> l(lock_); + attrs = attributes_; + for (const MetricMap::value_type& val : metric_map_) { + const MetricPrototype* prototype = val.first; + const scoped_refptr<Metric>& metric = val.second; + + if (select_all || MatchMetricInList(prototype->name(), requested_metrics)) { + InsertOrDie(&metrics, prototype->name(), metric); + } + } + } + + // If we had a filter, and we didn't either match this entity or any metrics inside + // it, don't print the entity at all. + if (!requested_metrics.empty() && !select_all && metrics.empty()) { + return Status::OK(); + } + + writer->StartObject(); + + writer->String("type"); + writer->String(prototype_->name()); + + writer->String("id"); + writer->String(id_); + + if (opts.include_entity_attributes) { + writer->String("attributes"); + writer->StartObject(); + for (const AttributeMap::value_type& val : attrs) { + writer->String(val.first); + writer->String(val.second); + } + writer->EndObject(); + } + + writer->String("metrics"); + writer->StartArray(); + for (OrderedMetricMap::value_type& val : metrics) { + const auto& m = val.second; + if (m->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) { + if (!opts.include_untouched_metrics && m->IsUntouched()) { + continue; + } + WARN_NOT_OK(m->WriteAsJson(writer, opts), + strings::Substitute("Failed to write $0 as JSON", val.first)); + } + } + writer->EndArray(); + + writer->EndObject(); + + return Status::OK(); +} + +void MetricEntity::RetireOldMetrics() { + MonoTime now(MonoTime::Now()); + + std::lock_guard<simple_spinlock> l(lock_); + for (auto it = metric_map_.begin(); it != metric_map_.end();) { + const scoped_refptr<Metric>& metric = it->second; + + if (PREDICT_TRUE(!metric->HasOneRef() && published_)) { + // The metric is still in use. Note that, in the case of "NeverRetire()", the metric + // will have a ref-count of 2 because it is reffed by the 'never_retire_metrics_' + // collection. + + // Ensure that it is not marked for later retirement (this could happen in the case + // that a metric is un-reffed and then re-reffed later by looking it up from the + // registry). + metric->retire_time_ = MonoTime(); + ++it; + continue; + } + + if (!metric->retire_time_.Initialized()) { + VLOG(3) << "Metric " << it->first << " has become un-referenced or unpublished. " + << "Will retire after the retention interval"; + // This is the first time we've seen this metric as retirable. + metric->retire_time_ = + now + MonoDelta::FromMilliseconds(FLAGS_metrics_retirement_age_ms); + ++it; + continue; + } + + // If we've already seen this metric in a previous scan, check if it's + // time to retire it yet. + if (now < metric->retire_time_) { + VLOG(3) << "Metric " << it->first << " is un-referenced, but still within " + << "the retention interval"; + ++it; + continue; + } + + + VLOG(2) << "Retiring metric " << it->first; + metric_map_.erase(it++); + } +} + +void MetricEntity::NeverRetire(const scoped_refptr<Metric>& metric) { + std::lock_guard<simple_spinlock> l(lock_); + never_retire_metrics_.push_back(metric); +} + +void MetricEntity::SetAttributes(const AttributeMap& attrs) { + std::lock_guard<simple_spinlock> l(lock_); + attributes_ = attrs; +} + +void MetricEntity::SetAttribute(const string& key, const string& val) { + std::lock_guard<simple_spinlock> l(lock_); + attributes_[key] = val; +} + +// +// MetricRegistry +// + +MetricRegistry::MetricRegistry() { +} + +MetricRegistry::~MetricRegistry() { +} + +Status MetricRegistry::WriteAsJson(JsonWriter* writer, + const vector<string>& requested_metrics, + const MetricJsonOptions& opts) const { + EntityMap entities; + { + std::lock_guard<simple_spinlock> l(lock_); + entities = entities_; + } + + writer->StartArray(); + for (const auto& e : entities) { + WARN_NOT_OK(e.second->WriteAsJson(writer, requested_metrics, opts), + Substitute("Failed to write entity $0 as JSON", e.second->id())); + } + writer->EndArray(); + + // Rather than having a thread poll metrics periodically to retire old ones, + // we'll just retire them here. The only downside is that, if no one is polling + // metrics, we may end up leaving them around indefinitely; however, metrics are + // small, and one might consider it a feature: if monitoring stops polling for + // metrics, we should keep them around until the next poll. + entities.clear(); // necessary to deref metrics we just dumped before doing retirement scan. + const_cast<MetricRegistry*>(this)->RetireOldMetrics(); + return Status::OK(); +} + +void MetricRegistry::RetireOldMetrics() { + std::lock_guard<simple_spinlock> l(lock_); + for (auto it = entities_.begin(); it != entities_.end();) { + it->second->RetireOldMetrics(); + + if (it->second->num_metrics() == 0 && + (it->second->HasOneRef() || !it->second->published())) { + // This entity has no metrics and either has no more external references or has + // been marked as unpublished, so we can remove it. + // Unlike retiring the metrics themselves, we don't wait for any timeout + // to retire them -- we assume that that timed retention has been satisfied + // by holding onto the metrics inside the entity. + entities_.erase(it++); + } else { + ++it; + } + } +} + +// +// MetricPrototypeRegistry +// +MetricPrototypeRegistry* MetricPrototypeRegistry::get() { + return Singleton<MetricPrototypeRegistry>::get(); +} + +void MetricPrototypeRegistry::AddMetric(const MetricPrototype* prototype) { + std::lock_guard<simple_spinlock> l(lock_); + metrics_.push_back(prototype); +} + +void MetricPrototypeRegistry::AddEntity(const MetricEntityPrototype* prototype) { + std::lock_guard<simple_spinlock> l(lock_); + entities_.push_back(prototype); +} + +void MetricPrototypeRegistry::WriteAsJson(JsonWriter* writer) const { + std::lock_guard<simple_spinlock> l(lock_); + MetricJsonOptions opts; + opts.include_schema_info = true; + writer->StartObject(); + + // Dump metric prototypes. + writer->String("metrics"); + writer->StartArray(); + for (const MetricPrototype* p : metrics_) { + writer->StartObject(); + p->WriteFields(writer, opts); + writer->String("entity_type"); + writer->String(p->entity_type()); + writer->EndObject(); + } + writer->EndArray(); + + // Dump entity prototypes. + writer->String("entities"); + writer->StartArray(); + for (const MetricEntityPrototype* p : entities_) { + writer->StartObject(); + writer->String("name"); + writer->String(p->name()); + writer->EndObject(); + } + writer->EndArray(); + + writer->EndObject(); +} + +void MetricPrototypeRegistry::WriteAsJson() const { + std::ostringstream s; + JsonWriter w(&s, JsonWriter::PRETTY); + WriteAsJson(&w); + std::cout << s.str() << std::endl; +} + +// +// MetricPrototype +// +MetricPrototype::MetricPrototype(CtorArgs args) : args_(args) { + MetricPrototypeRegistry::get()->AddMetric(this); +} + +void MetricPrototype::WriteFields(JsonWriter* writer, + const MetricJsonOptions& opts) const { + writer->String("name"); + writer->String(name()); + + if (opts.include_schema_info) { + writer->String("label"); + writer->String(label()); + + writer->String("type"); + writer->String(MetricType::Name(type())); + + writer->String("unit"); + writer->String(MetricUnit::Name(unit())); + + writer->String("description"); + writer->String(description()); + } +} + +// +// FunctionGaugeDetacher +// + +FunctionGaugeDetacher::FunctionGaugeDetacher() { +} + +FunctionGaugeDetacher::~FunctionGaugeDetacher() { + for (const Closure& c : callbacks_) { + c.Run(); + } +} + +scoped_refptr<MetricEntity> MetricRegistry::FindOrCreateEntity( + const MetricEntityPrototype* prototype, + const std::string& id, + const MetricEntity::AttributeMap& initial_attributes) { + std::lock_guard<simple_spinlock> l(lock_); + scoped_refptr<MetricEntity> e = FindPtrOrNull(entities_, id); + if (!e) { + e = new MetricEntity(prototype, id, initial_attributes); + InsertOrDie(&entities_, id, e); + } else if (!e->published()) { + e = new MetricEntity(prototype, id, initial_attributes); + entities_[id] = e; + } else { + e->SetAttributes(initial_attributes); + } + return e; +} + +// +// Metric +// + +std::atomic<int64_t> Metric::g_epoch_; + +Metric::Metric(const MetricPrototype* prototype) + : prototype_(prototype), + m_epoch_(current_epoch()) { +} + +Metric::~Metric() { +} + +void Metric::IncrementEpoch() { + g_epoch_++; +} + +void Metric::UpdateModificationEpochSlowPath() { + int64_t new_epoch, old_epoch; + // CAS loop to ensure that we never transition a metric's epoch backwards + // even if multiple threads race to update it. + do { + old_epoch = m_epoch_; + new_epoch = g_epoch_; + } while (old_epoch < new_epoch && + !m_epoch_.compare_exchange_weak(old_epoch, new_epoch)); +} + +// +// Gauge +// + +Status Gauge::WriteAsJson(JsonWriter* writer, + const MetricJsonOptions& opts) const { + writer->StartObject(); + + prototype_->WriteFields(writer, opts); + + writer->String("value"); + WriteValue(writer); + + writer->EndObject(); + return Status::OK(); +} + +// +// StringGauge +// + +StringGauge::StringGauge(const GaugePrototype<string>* proto, + string initial_value) + : Gauge(proto), value_(std::move(initial_value)) {} + +std::string StringGauge::value() const { + std::lock_guard<simple_spinlock> l(lock_); + return value_; +} + +void StringGauge::set_value(const std::string& value) { + UpdateModificationEpoch(); + std::lock_guard<simple_spinlock> l(lock_); + value_ = value; +} + +void StringGauge::WriteValue(JsonWriter* writer) const { + writer->String(value()); +} + +// +// Counter +// +// This implementation is optimized by using a striped counter. See LongAdder for details. + +scoped_refptr<Counter> CounterPrototype::Instantiate(const scoped_refptr<MetricEntity>& entity) { + return entity->FindOrCreateCounter(this); +} + +Counter::Counter(const CounterPrototype* proto) : Metric(proto) { +} + +int64_t Counter::value() const { + return value_.Value(); +} + +void Counter::Increment() { + IncrementBy(1); +} + +void Counter::IncrementBy(int64_t amount) { + UpdateModificationEpoch(); + value_.IncrementBy(amount); +} + +Status Counter::WriteAsJson(JsonWriter* writer, + const MetricJsonOptions& opts) const { + writer->StartObject(); + + prototype_->WriteFields(writer, opts); + + writer->String("value"); + writer->Int64(value()); + + writer->EndObject(); + return Status::OK(); +} + +///////////////////////////////////////////////// +// HistogramPrototype +///////////////////////////////////////////////// + +HistogramPrototype::HistogramPrototype(const MetricPrototype::CtorArgs& args, + uint64_t max_trackable_value, int num_sig_digits) + : MetricPrototype(args), + max_trackable_value_(max_trackable_value), + num_sig_digits_(num_sig_digits) { + // Better to crash at definition time that at instantiation time. + CHECK(HdrHistogram::IsValidHighestTrackableValue(max_trackable_value)) + << Substitute("Invalid max trackable value on histogram $0: $1", + args.name_, max_trackable_value); + CHECK(HdrHistogram::IsValidNumSignificantDigits(num_sig_digits)) + << Substitute("Invalid number of significant digits on histogram $0: $1", + args.name_, num_sig_digits); +} + +scoped_refptr<Histogram> HistogramPrototype::Instantiate( + const scoped_refptr<MetricEntity>& entity) { + return entity->FindOrCreateHistogram(this); +} + +///////////////////////////////////////////////// +// Histogram +///////////////////////////////////////////////// + +Histogram::Histogram(const HistogramPrototype* proto) + : Metric(proto), + histogram_(new HdrHistogram(proto->max_trackable_value(), proto->num_sig_digits())) { +} + +void Histogram::Increment(int64_t value) { + UpdateModificationEpoch(); + histogram_->Increment(value); +} + +void Histogram::IncrementBy(int64_t value, int64_t amount) { + UpdateModificationEpoch(); + histogram_->IncrementBy(value, amount); +} + +Status Histogram::WriteAsJson(JsonWriter* writer, + const MetricJsonOptions& opts) const { + + HistogramSnapshotPB snapshot; + RETURN_NOT_OK(GetHistogramSnapshotPB(&snapshot, opts)); + writer->Protobuf(snapshot); + return Status::OK(); +} + +Status Histogram::GetHistogramSnapshotPB(HistogramSnapshotPB* snapshot_pb, + const MetricJsonOptions& opts) const { + snapshot_pb->set_name(prototype_->name()); + if (opts.include_schema_info) { + snapshot_pb->set_type(MetricType::Name(prototype_->type())); + snapshot_pb->set_label(prototype_->label()); + snapshot_pb->set_unit(MetricUnit::Name(prototype_->unit())); + snapshot_pb->set_description(prototype_->description()); + snapshot_pb->set_max_trackable_value(histogram_->highest_trackable_value()); + snapshot_pb->set_num_significant_digits(histogram_->num_significant_digits()); + } + // Fast-path for a reasonably common case of an empty histogram. This occurs + // when a histogram is tracking some information about a feature not in + // use, for example. + if (histogram_->TotalCount() == 0) { + snapshot_pb->set_total_count(0); + snapshot_pb->set_total_sum(0); + snapshot_pb->set_min(0); + snapshot_pb->set_mean(0); + snapshot_pb->set_percentile_75(0); + snapshot_pb->set_percentile_95(0); + snapshot_pb->set_percentile_99(0); + snapshot_pb->set_percentile_99_9(0); + snapshot_pb->set_percentile_99_99(0); + snapshot_pb->set_max(0); + } else { + HdrHistogram snapshot(*histogram_); + snapshot_pb->set_total_count(snapshot.TotalCount()); + snapshot_pb->set_total_sum(snapshot.TotalSum()); + snapshot_pb->set_min(snapshot.MinValue()); + snapshot_pb->set_mean(snapshot.MeanValue()); + snapshot_pb->set_percentile_75(snapshot.ValueAtPercentile(75)); + snapshot_pb->set_percentile_95(snapshot.ValueAtPercentile(95)); + snapshot_pb->set_percentile_99(snapshot.ValueAtPercentile(99)); + snapshot_pb->set_percentile_99_9(snapshot.ValueAtPercentile(99.9)); + snapshot_pb->set_percentile_99_99(snapshot.ValueAtPercentile(99.99)); + snapshot_pb->set_max(snapshot.MaxValue()); + + if (opts.include_raw_histograms) { + RecordedValuesIterator iter(&snapshot); + while (iter.HasNext()) { + HistogramIterationValue value; + RETURN_NOT_OK(iter.Next(&value)); + snapshot_pb->add_values(value.value_iterated_to); + snapshot_pb->add_counts(value.count_at_value_iterated_to); + } + } + } + return Status::OK(); +} + +uint64_t Histogram::CountInBucketForValueForTests(uint64_t value) const { + return histogram_->CountInBucketForValue(value); +} + +uint64_t Histogram::TotalCount() const { + return histogram_->TotalCount(); +} + +uint64_t Histogram::MinValueForTests() const { + return histogram_->MinValue(); +} + +uint64_t Histogram::MaxValueForTests() const { + return histogram_->MaxValue(); +} +double Histogram::MeanValueForTests() const { + return histogram_->MeanValue(); +} + +ScopedLatencyMetric::ScopedLatencyMetric(Histogram* latency_hist) + : latency_hist_(latency_hist) { + if (latency_hist_) { + time_started_ = MonoTime::Now(); + } +} + +ScopedLatencyMetric::~ScopedLatencyMetric() { + if (latency_hist_ != nullptr) { + MonoTime time_now = MonoTime::Now(); + latency_hist_->Increment((time_now - time_started_).ToMicroseconds()); + } +} + +} // namespace kudu