http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/memory.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/memory.h b/be/src/kudu/util/memory/memory.h
new file mode 100644
index 0000000..315631b
--- /dev/null
+++ b/be/src/kudu/util/memory/memory.h
@@ -0,0 +1,970 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+//
+// Classes for memory management, used by materializations
+// (arenas, segments, and STL collections parametrized via arena allocators)
+// so that memory usage can be controlled at the application level.
+//
+// Materializations can be parametrized by specifying an instance of a
+// BufferAllocator. The allocator implements
+// memory management policy (e.g. setting allocation limits). Allocators may
+// be shared between multiple materializations; e.g. you can designate a
+// single allocator per a single user request, thus setting bounds on memory
+// usage on a per-request basis.
+
+#ifndef KUDU_UTIL_MEMORY_MEMORY_H_
+#define KUDU_UTIL_MEMORY_MEMORY_H_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/boost_mutex_utils.h"
+#include "kudu/util/memory/overwrite.h"
+#include "kudu/util/mutex.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/singleton.h"
+
+namespace kudu {
+
+class BufferAllocator;
+class MemTracker;
+
+// Wrapper for a block of data allocated by a BufferAllocator. Owns the block.
+// (To release the block, destroy the buffer - it will then return it via the
+// same allocator that has been used to create it).
+class Buffer {
+ public:
+  ~Buffer();
+
+  void* data() const { return data_; }   // The data buffer.
+  size_t size() const { return size_; }  // In bytes.
+
+ private:
+  friend class BufferAllocator;
+
+  Buffer(void* data, size_t size, BufferAllocator* allocator)
+      : data_(CHECK_NOTNULL(data)),
+        size_(size),
+        allocator_(allocator) {
+#ifndef NDEBUG
+    OverwriteWithPattern(reinterpret_cast<char*>(data_), size_,
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"
+                         "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW");
+#endif
+  }
+
+  // Called by a successful realloc.
+  void Update(void* new_data, size_t new_size) {
+#ifndef NDEBUG
+    if (new_size > size_) {
+      OverwriteWithPattern(reinterpret_cast<char*>(new_data) + size_,
+                           new_size - size_, "NEW");
+    }
+#endif
+    data_ = new_data;
+    size_ = new_size;
+  }
+
+  void* data_;
+  size_t size_;
+  BufferAllocator* const allocator_;
+  DISALLOW_COPY_AND_ASSIGN(Buffer);
+};
+
+// Allocators allow applications to control memory usage. They are
+// used by materializations to allocate blocks of memory arenas.
+// BufferAllocator is an abstract class that defines a common contract of
+// all implementations of allocators. Specific allocators provide specific
+// features, e.g. enforced resource limits, thread safety, etc.
+class BufferAllocator {
+ public:
+  virtual ~BufferAllocator() {}
+
+  // Called by the user when a new block of memory is needed. The 'requested'
+  // parameter specifies how much memory (in bytes) the user would like to get.
+  // The 'minimal' parameter specifies how much he is willing to settle for.
+  // The allocator returns a buffer sized in the range [minimal, requested],
+  // or NULL if the request can't be satisfied. When the buffer is destroyed,
+  // its destructor calls the FreeInternal() method on its allocator.
+  // CAVEAT: The allocator must outlive all buffers returned by it.
+  //
+  // Corner cases:
+  // 1. If requested == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer and zero capacity.
+  // 2. If minimal == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer, possibly with zero capacity.
+  Buffer* BestEffortAllocate(size_t requested, size_t minimal) {
+    DCHECK_LE(minimal, requested);
+    Buffer* result = AllocateInternal(requested, minimal, this);
+    LogAllocation(requested, minimal, result);
+    return result;
+  }
+
+  // Called by the user when a new block of memory is needed. Equivalent to
+  // BestEffortAllocate(requested, requested).
+  Buffer* Allocate(size_t requested) {
+    return BestEffortAllocate(requested, requested);
+  }
+
+  // Called by the user when a previously allocated block needs to be resized.
+  // Mimics semantics of <cstdlib> realloc. The 'requested' and 'minimal'
+  // represent the desired final buffer size, with semantics as in the 
Allocate.
+  // If the 'buffer' parameter is NULL, the call is equivalent to
+  // Allocate(requested, minimal). Otherwise, a reallocation of the buffer's
+  // data is attempted. On success, the original 'buffer' parameter is 
returned,
+  // but the buffer itself might have updated size and data. On failure,
+  // returns NULL, and leaves the input buffer unmodified.
+  // Reallocation might happen in-place, preserving the original data
+  // pointer, but it is not guaranteed - e.g. this function might degenerate to
+  // Allocate-Copy-Free. Either way, the content of the data buffer, up to the
+  // minimum of the new and old size, is preserved.
+  //
+  // Corner cases:
+  // 1. If requested == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer and zero capacity.
+  // 2. If minimal == 0, the allocator will always return a non-NULL Buffer
+  //    with a non-NULL data pointer, possibly with zero capacity.
+  Buffer* BestEffortReallocate(size_t requested,
+                               size_t minimal,
+                               Buffer* buffer) {
+    DCHECK_LE(minimal, requested);
+    Buffer* result;
+    if (buffer == NULL) {
+      result = AllocateInternal(requested, minimal, this);
+      LogAllocation(requested, minimal, result);
+      return result;
+    } else {
+      result =  ReallocateInternal(requested, minimal, buffer, this) ?
+          buffer : NULL;
+      LogAllocation(requested, minimal, buffer);
+      return result;
+    }
+  }
+
+  // Called by the user when a previously allocated block needs to be resized.
+  // Equivalent to BestEffortReallocate(requested, requested, buffer).
+  Buffer* Reallocate(size_t requested, Buffer* buffer) {
+    return BestEffortReallocate(requested, requested, buffer);
+  }
+
+  // Returns the amount of memory (in bytes) still available for this 
allocator.
+  // For unbounded allocators (like raw HeapBufferAllocator) this is the 
highest
+  // size_t value possible.
+  // TODO(user): consider making pure virtual.
+  virtual size_t Available() const { return 
std::numeric_limits<size_t>::max(); }
+
+ protected:
+  friend class Buffer;
+
+  BufferAllocator() {}
+
+  // Expose the constructor to subclasses of BufferAllocator.
+  Buffer* CreateBuffer(void* data,
+                       size_t size,
+                       BufferAllocator* allocator) {
+    return new Buffer(data, size, allocator);
+  }
+
+  // Expose Buffer::Update to subclasses of BufferAllocator.
+  void UpdateBuffer(void* new_data, size_t new_size, Buffer* buffer) {
+    buffer->Update(new_data, new_size);
+  }
+
+  // Called by chained buffer allocators.
+  Buffer* DelegateAllocate(BufferAllocator* delegate,
+                           size_t requested,
+                           size_t minimal,
+                           BufferAllocator* originator) {
+    return delegate->AllocateInternal(requested, minimal, originator);
+  }
+
+  // Called by chained buffer allocators.
+  bool DelegateReallocate(BufferAllocator* delegate,
+                          size_t requested,
+                          size_t minimal,
+                          Buffer* buffer,
+                          BufferAllocator* originator) {
+    return delegate->ReallocateInternal(requested, minimal, buffer, 
originator);
+  }
+
+  // Called by chained buffer allocators.
+  void DelegateFree(BufferAllocator* delegate, Buffer* buffer) {
+    delegate->FreeInternal(buffer);
+  }
+
+ private:
+  // Implemented by concrete subclasses.
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) = 0;
+
+  // Implemented by concrete subclasses. Returns false on failure.
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) = 0;
+
+  // Implemented by concrete subclasses.
+  virtual void FreeInternal(Buffer* buffer) = 0;
+
+  // Logs a warning message if the allocation failed or if it returned less 
than
+  // the required number of bytes.
+  void LogAllocation(size_t required, size_t minimal, Buffer* buffer);
+
+  DISALLOW_COPY_AND_ASSIGN(BufferAllocator);
+};
+
+// Allocates buffers on the heap, with no memory limits. Uses standard C
+// allocation functions (malloc, realloc, free).
+class HeapBufferAllocator : public BufferAllocator {
+ public:
+  virtual ~HeapBufferAllocator() {}
+
+  // Returns a singleton instance of the heap allocator.
+  static HeapBufferAllocator* Get() {
+    return Singleton<HeapBufferAllocator>::get();
+  }
+
+  virtual size_t Available() const OVERRIDE {
+    return std::numeric_limits<size_t>::max();
+  }
+
+ private:
+  // Allocates memory that is aligned to 16 way.
+  // Use if you want to boost SIMD operations on the memory area.
+  const bool aligned_mode_;
+
+  friend class Singleton<HeapBufferAllocator>;
+
+  // Always allocates 'requested'-sized buffer, or returns NULL on OOM.
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  void* Malloc(size_t size);
+  void* Realloc(void* previousData, size_t previousSize, size_t newSize);
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  HeapBufferAllocator();
+  explicit HeapBufferAllocator(bool aligned_mode)
+      : aligned_mode_(aligned_mode) {}
+
+  DISALLOW_COPY_AND_ASSIGN(HeapBufferAllocator);
+};
+
+// Wrapper around the delegate allocator, that clears all newly allocated
+// (and reallocated) memory.
+class ClearingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  explicit ClearingBufferAllocator(BufferAllocator* delegate)
+      : delegate_(delegate) {}
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  DISALLOW_COPY_AND_ASSIGN(ClearingBufferAllocator);
+};
+
+// Abstract policy for modifying allocation requests - e.g. enforcing quotas.
+class Mediator {
+ public:
+  Mediator() {}
+  virtual ~Mediator() {}
+
+  // Called by an allocator when a allocation request is processed.
+  // Must return a value in the range [minimal, requested], or zero. Returning
+  // zero (if minimal is non-zero) indicates denial to allocate. Returning
+  // non-zero indicates that the request should be capped at that value.
+  virtual size_t Allocate(size_t requested, size_t minimal) = 0;
+
+  // Called by an allocator when the specified amount (in bytes) is released.
+  virtual void Free(size_t amount) = 0;
+
+  // TODO(user): consider making pure virtual.
+  virtual size_t Available() const { return 
std::numeric_limits<size_t>::max(); }
+};
+
+// Optionally thread-safe skeletal implementation of a 'quota' abstraction,
+// providing methods to allocate resources against the quota, and return them.
+template<bool thread_safe>
+class Quota : public Mediator {
+ public:
+  explicit Quota(bool enforced) : usage_(0), enforced_(enforced) {}
+  virtual ~Quota() {}
+
+  // Returns a value in range [minimal, requested] if not exceeding remaining
+  // quota or if the quota is not enforced (soft quota), and adjusts the usage
+  // value accordingly.  Otherwise, returns zero. The semantics of 'remaining
+  // quota' are defined by subclasses (that must supply GetQuotaInternal()
+  // method).
+  virtual size_t Allocate(size_t requested, size_t minimal) OVERRIDE;
+
+  virtual void Free(size_t amount) OVERRIDE;
+
+  // Returns memory still available in the quota. For unenforced Quota objects,
+  // you are still able to perform _minimal_ allocations when the available
+  // quota is 0 (or less than "minimal" param).
+  virtual size_t Available() const OVERRIDE {
+    lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex());
+    const size_t quota = GetQuotaInternal();
+    return (usage_ >= quota) ? 0 : (quota - usage_);
+  }
+
+  // Returns the current quota value.
+  size_t GetQuota() const;
+
+  // Returns the current usage value, defined as a sum of all the values
+  // granted by calls to Allocate, less these released via calls to Free.
+  size_t GetUsage() const;
+
+  bool enforced() const {
+    return enforced_;
+  }
+
+ protected:
+  // Overridden by specific implementations, to define semantics of
+  // the quota, i.e. the total amount of resources that the mediator will
+  // allocate. Called directly from GetQuota that optionally provides
+  // thread safety. An 'Allocate' request will succeed if
+  // GetUsage() + minimal <= GetQuota() or if the quota is not enforced (soft
+  // quota).
+  virtual size_t GetQuotaInternal() const = 0;
+
+  Mutex* mutex() const { return thread_safe ? &mutex_ : NULL; }
+
+ private:
+  mutable Mutex mutex_;
+  size_t usage_;
+  bool enforced_;
+  DISALLOW_COPY_AND_ASSIGN(Quota);
+};
+
+// Optionally thread-safe static quota implementation (where quota is 
explicitly
+// set to a concrete numeric value).
+template<bool thread_safe>
+class StaticQuota : public Quota<thread_safe> {
+ public:
+  explicit StaticQuota(size_t quota)
+      : Quota<thread_safe>(true) {
+    SetQuota(quota);
+  }
+  StaticQuota(size_t quota, bool enforced)
+      : Quota<thread_safe>(enforced) {
+    SetQuota(quota);
+  }
+  virtual ~StaticQuota() {}
+
+  // Sets quota to the new value.
+  void SetQuota(const size_t quota);
+
+ protected:
+  virtual size_t GetQuotaInternal() const { return quota_; }
+
+ private:
+  size_t quota_;
+  DISALLOW_COPY_AND_ASSIGN(StaticQuota);
+};
+
+// Places resource limits on another allocator, using the specified Mediator
+// (e.g. quota) implementation.
+//
+// If the mediator and the delegate allocator are thread-safe, this allocator
+// is also thread-safe, to the extent that it will not introduce any
+// state inconsistencies. However, without additional synchronization,
+// allocation requests are not atomic end-to-end. This way, it is deadlock-
+// resilient (even if you have cyclic relationships between allocators) and
+// allows better concurrency. But, it may cause over-conservative
+// allocations under memory contention, if you have multiple levels of
+// mediating allocators. For example, if two requests that can't both be
+// satisfied are submitted concurrently, it may happen that one of them 
succeeds
+// but gets smaller buffer allocated than it would if the requests were 
strictly
+// ordered. This is usually not a problem, however, as you don't really want to
+// operate so close to memory limits that some of your allocations can't be
+// satisfied. If you do have a simple, cascading graph of allocators though,
+// and want to force requests be atomic end-to-end, put a
+// ThreadSafeBufferAllocator at the entry point.
+class MediatingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate, nor the mediator, allowing
+  // both to be reused.
+  MediatingBufferAllocator(BufferAllocator* const delegate,
+                           Mediator* const mediator)
+      : delegate_(delegate),
+        mediator_(mediator) {}
+
+  virtual ~MediatingBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return std::min(delegate_->Available(), mediator_->Available());
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  Mediator* const mediator_;
+};
+
+// Convenience non-thread-safe static memory bounds enforcer.
+// Combines MediatingBufferAllocator with a StaticQuota.
+class MemoryLimit : public BufferAllocator {
+ public:
+  // Creates a limiter based on the default, heap allocator. Quota is infinite.
+  // (Can be set using SetQuota).
+  MemoryLimit()
+      : quota_(std::numeric_limits<size_t>::max()),
+        allocator_(HeapBufferAllocator::Get(), &quota_) {}
+
+  // Creates a limiter based on the default, heap allocator.
+  explicit MemoryLimit(size_t quota)
+      : quota_(quota),
+        allocator_(HeapBufferAllocator::Get(), &quota_) {}
+
+  // Creates a limiter relaying to the specified delegate allocator.
+  MemoryLimit(size_t quota, BufferAllocator* const delegate)
+      : quota_(quota),
+        allocator_(delegate, &quota_) {}
+
+  // Creates a (possibly non-enforcing) limiter relaying to the specified
+  // delegate allocator.
+  MemoryLimit(size_t quota, bool enforced, BufferAllocator* const delegate)
+      : quota_(quota, enforced),
+        allocator_(delegate, &quota_) {}
+
+  virtual ~MemoryLimit() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return allocator_.Available();
+  }
+
+  size_t GetQuota() const { return quota_.GetQuota(); }
+  size_t GetUsage() const { return quota_.GetUsage(); }
+  void SetQuota(const size_t quota) { quota_.SetQuota(quota); }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    return DelegateAllocate(&allocator_, requested, minimal, originator);
+  }
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    return DelegateReallocate(&allocator_, requested, minimal, buffer,
+                              originator);
+  }
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&allocator_, buffer);
+  }
+
+  StaticQuota<false> quota_;
+  MediatingBufferAllocator allocator_;
+};
+
+// An allocator that allows to bypass the (potential) soft quota below for a
+// given amount of memory usage. The goal is to make the allocation methods and
+// Available() work as if the allocator below had at least bypassed_amount of
+// soft quota. Of course this class doesn't allow to exceed the hard quota.
+class SoftQuotaBypassingBufferAllocator : public BufferAllocator {
+ public:
+  SoftQuotaBypassingBufferAllocator(BufferAllocator* allocator,
+                                    size_t bypassed_amount)
+      : allocator_(std::numeric_limits<size_t>::max(), allocator),
+        bypassed_amount_(bypassed_amount) {}
+
+  virtual size_t Available() const OVERRIDE {
+    const size_t usage = allocator_.GetUsage();
+    size_t available = allocator_.Available();
+    if (bypassed_amount_ > usage) {
+      available = std::max(bypassed_amount_ - usage, available);
+    }
+    return available;
+  }
+
+ private:
+  // Calculates how much to increase the minimal parameter to allocate more
+  // aggressively in the underlying allocator. This is to avoid getting only
+  // very small allocations when we exceed the soft quota below. The request
+  // with increased minimal size is more likely to fail because of exceeding
+  // hard quota, so we also fall back to the original minimal size.
+  size_t AdjustMinimal(size_t requested, size_t minimal) const {
+    return std::min(requested, std::max(minimal, Available()));
+  }
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    // Try increasing the "minimal" parameter to allocate more aggresively
+    // within the bypassed amount of soft quota.
+    Buffer* result = DelegateAllocate(&allocator_,
+                                      requested,
+                                      AdjustMinimal(requested, minimal),
+                                      originator);
+    if (result != NULL) {
+      return result;
+    } else {
+      return DelegateAllocate(&allocator_,
+                              requested,
+                              minimal,
+                              originator);
+    }
+  }
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    if (DelegateReallocate(&allocator_,
+                           requested,
+                           AdjustMinimal(requested, minimal),
+                           buffer,
+                           originator)) {
+      return true;
+    } else {
+      return DelegateReallocate(&allocator_,
+                                requested,
+                                minimal,
+                                buffer,
+                                originator);
+    }
+  }
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&allocator_, buffer);
+  }
+
+  // Using MemoryLimit with "infinite" limit to get GetUsage().
+  MemoryLimit allocator_;
+  size_t bypassed_amount_;
+};
+
+// An interface for a MemoryStatisticsCollector - an object which collects
+// information about the memory usage of the allocator. The collector will
+// gather statistics about memory usage based on information received from the
+// allocator.
+class MemoryStatisticsCollectorInterface {
+ public:
+  MemoryStatisticsCollectorInterface() {}
+
+  virtual ~MemoryStatisticsCollectorInterface() {}
+
+  // Informs the collector that the allocator granted bytes memory. Note that 
in
+  // the case of reallocation bytes should be the increase in total memory
+  // usage, not the total size of the buffer after reallocation.
+  virtual void AllocatedMemoryBytes(size_t bytes) = 0;
+
+  // Informs the collector that the allocator received a request for at least
+  // bytes memory, and rejected it (meaning that it granted nothing).
+  virtual void RefusedMemoryBytes(size_t bytes) = 0;
+
+  // Informs the collector that bytes memory have been released to the
+  // allocator.
+  virtual void FreedMemoryBytes(size_t bytes) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MemoryStatisticsCollectorInterface);
+};
+
+class MemoryStatisticsCollectingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  // Takes ownership of memory_stats_collector.
+  MemoryStatisticsCollectingBufferAllocator(
+      BufferAllocator* const delegate,
+      MemoryStatisticsCollectorInterface* const memory_stats_collector)
+      : delegate_(delegate),
+        memory_stats_collector_(memory_stats_collector) {}
+
+  virtual ~MemoryStatisticsCollectingBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  gscoped_ptr<MemoryStatisticsCollectorInterface>
+      memory_stats_collector_;
+};
+
+// BufferAllocator which uses MemTracker to keep track of and optionally
+// (if a limit is set on the MemTracker) regulate memory consumption.
+class MemoryTrackingBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate. The delegate must remain
+  // valid for the lifetime of this allocator. Increments reference
+  // count for 'mem_tracker'.
+  // If 'mem_tracker' has a limit and 'enforce_limit' is true, then
+  // the classes calling this buffer allocator (whether directly, or
+  // through an Arena) must be able to handle the case when allocation
+  // fails. If 'enforce_limit' is false (this is the default), then
+  // allocation will always succeed.
+  MemoryTrackingBufferAllocator(BufferAllocator* const delegate,
+                                std::shared_ptr<MemTracker> mem_tracker,
+                                bool enforce_limit = false)
+      : delegate_(delegate),
+        mem_tracker_(std::move(mem_tracker)),
+        enforce_limit_(enforce_limit) {}
+
+  virtual ~MemoryTrackingBufferAllocator() {}
+
+  // If enforce limit is false, this always returns maximum possible value
+  // for int64_t (std::numeric_limits<int64_t>::max()). Otherwise, this
+  // is equivalent to calling mem_tracker_->SpareCapacity();
+  virtual size_t Available() const OVERRIDE;
+
+ private:
+
+  // If enforce_limit_ is true, this is equivalent to calling
+  // mem_tracker_->TryConsume(bytes). If enforce_limit_ is false and
+  // mem_tracker_->TryConsume(bytes) is false, we call
+  // mem_tracker_->Consume(bytes) and always return true.
+  bool TryConsume(int64_t bytes);
+
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE;
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE;
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE;
+
+  BufferAllocator* delegate_;
+  std::shared_ptr<MemTracker> mem_tracker_;
+  bool enforce_limit_;
+};
+
+// Synchronizes access to AllocateInternal and FreeInternal, and exposes the
+// mutex for use by subclasses. Allocation requests performed through this
+// allocator are atomic end-to-end. Template parameter DelegateAllocatorType
+// allows to specify a subclass of BufferAllocator for the delegate, to allow
+// subclasses of ThreadSafeBufferAllocator to access additional methods 
provided
+// by the allocator subclass. If this is not needed, it can be set to
+// BufferAllocator.
+template <class DelegateAllocatorType>
+class ThreadSafeBufferAllocator : public BufferAllocator {
+ public:
+  // Does not take ownership of the delegate.
+  explicit ThreadSafeBufferAllocator(DelegateAllocatorType* delegate)
+      : delegate_(delegate) {}
+  virtual ~ThreadSafeBufferAllocator() {}
+
+  virtual size_t Available() const OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->Available();
+  }
+
+ protected:
+  Mutex* mutex() const { return &mutex_; }
+  // Expose the delegate allocator, with the precise type of the allocator
+  // specified by the template parameter. The delegate() methods themselves
+  // don't give any thread-safety guarantees. Protect all uses taking the Mutex
+  // exposed by the mutex() method.
+  DelegateAllocatorType* delegate() { return delegate_; }
+  const DelegateAllocatorType* delegate() const { return delegate_; }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return DelegateAllocate(delegate(), requested, minimal, originator);
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return DelegateReallocate(delegate(), requested, minimal, buffer,
+                              originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    lock_guard_maybe<Mutex> lock(mutex());
+    DelegateFree(delegate(), buffer);
+  }
+
+  DelegateAllocatorType* delegate_;
+  mutable Mutex mutex_;
+  DISALLOW_COPY_AND_ASSIGN(ThreadSafeBufferAllocator);
+};
+
+// A version of ThreadSafeBufferAllocator that owns the supplied delegate
+// allocator.
+template <class DelegateAllocatorType>
+class OwningThreadSafeBufferAllocator
+    : public ThreadSafeBufferAllocator<DelegateAllocatorType> {
+ public:
+  explicit OwningThreadSafeBufferAllocator(DelegateAllocatorType* delegate)
+      : ThreadSafeBufferAllocator<DelegateAllocatorType>(delegate),
+        delegate_owned_(delegate) {}
+  virtual ~OwningThreadSafeBufferAllocator() {}
+
+ private:
+  gscoped_ptr<DelegateAllocatorType> delegate_owned_;
+};
+
+class ThreadSafeMemoryLimit
+    : public OwningThreadSafeBufferAllocator<MemoryLimit> {
+ public:
+  ThreadSafeMemoryLimit(size_t quota, bool enforced,
+                        BufferAllocator* const delegate)
+      : OwningThreadSafeBufferAllocator<MemoryLimit>(
+            new MemoryLimit(quota, enforced, delegate)) {}
+  virtual ~ThreadSafeMemoryLimit() {}
+
+  size_t GetQuota() const {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->GetQuota();
+  }
+  size_t GetUsage() const {
+    lock_guard_maybe<Mutex> lock(mutex());
+    return delegate()->GetUsage();
+  }
+  void SetQuota(const size_t quota) {
+    lock_guard_maybe<Mutex> lock(mutex());
+    delegate()->SetQuota(quota);
+  }
+};
+
+// A BufferAllocator that can be given ownership of many objects of given type.
+// These objects will then be deleted when the buffer allocator is destroyed.
+// The objects added last are deleted first (LIFO).
+template <typename OwnedType>
+class OwningBufferAllocator : public BufferAllocator {
+ public:
+  // Doesn't take ownership of delegate.
+  explicit OwningBufferAllocator(BufferAllocator* const delegate)
+      : delegate_(delegate) {}
+
+  virtual ~OwningBufferAllocator() {
+    // Delete elements starting from the end.
+    while (!owned_.empty()) {
+      OwnedType* p = owned_.back();
+      owned_.pop_back();
+      delete p;
+    }
+  }
+
+  // Add to the collection of objects owned by this allocator. The object added
+  // last is deleted first.
+  OwningBufferAllocator* Add(OwnedType* p) {
+    owned_.push_back(p);
+    return this;
+  }
+
+  virtual size_t Available() const OVERRIDE {
+    return delegate_->Available();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    return DelegateAllocate(delegate_, requested, minimal, originator);
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t minimal,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    return DelegateReallocate(delegate_, requested, minimal, buffer,
+                              originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(delegate_, buffer);
+  }
+
+  // Not using PointerVector here because we want to guarantee certain order of
+  // deleting elements (starting from the ones added last).
+  std::vector<OwnedType*> owned_;
+  BufferAllocator* delegate_;
+};
+
+// Buffer allocator that tries to guarantee the exact and consistent amount
+// of memory. Uses hard MemoryLimit to enforce the upper bound but also
+// guarantees consistent allocations by ignoring minimal requested amounts and
+// always returning the full amount of memory requested if available.
+// Allocations will fail if the memory requested would exceed the quota or if
+// the underlying allocator fails to provide the memory.
+class GuaranteeMemory : public BufferAllocator {
+ public:
+  // Doesn't take ownership of 'delegate'.
+  GuaranteeMemory(size_t memory_quota,
+                  BufferAllocator* delegate)
+      : limit_(memory_quota, true, delegate),
+        memory_guarantee_(memory_quota) {}
+
+  virtual size_t Available() const OVERRIDE {
+    return memory_guarantee_ - limit_.GetUsage();
+  }
+
+ private:
+  virtual Buffer* AllocateInternal(size_t requested,
+                                   size_t minimal,
+                                   BufferAllocator* originator) OVERRIDE {
+    if (requested > Available()) {
+      return NULL;
+    } else {
+      return DelegateAllocate(&limit_, requested, requested, originator);
+    }
+  }
+
+  virtual bool ReallocateInternal(size_t requested,
+                                  size_t /* minimal */,
+                                  Buffer* buffer,
+                                  BufferAllocator* originator) OVERRIDE {
+    int64_t additional_memory = requested - (buffer != NULL ? buffer->size() : 
0);
+    return additional_memory <= static_cast<int64_t>(Available())
+        && DelegateReallocate(&limit_, requested, requested,
+                              buffer, originator);
+  }
+
+  virtual void FreeInternal(Buffer* buffer) OVERRIDE {
+    DelegateFree(&limit_, buffer);
+  }
+
+  MemoryLimit limit_;
+  size_t memory_guarantee_;
+  DISALLOW_COPY_AND_ASSIGN(GuaranteeMemory);
+};
+
+// Implementation of inline and template methods
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::Allocate(const size_t requested,
+                                    const size_t minimal) {
+  lock_guard_maybe<Mutex> lock(mutex());
+  DCHECK_LE(minimal, requested)
+      << "\"minimal\" shouldn't be bigger than \"requested\"";
+  const size_t quota = GetQuotaInternal();
+  size_t allocation;
+  if (usage_ > quota || minimal > quota - usage_) {
+    // OOQ (Out of quota).
+    if (!enforced() && minimal <= std::numeric_limits<size_t>::max() - usage_) 
{
+      // The quota is unenforced and the value of "minimal" won't cause an
+      // overflow. Perform a minimal allocation.
+      allocation = minimal;
+    } else {
+      allocation = 0;
+    }
+    LOG(WARNING) << "Out of quota. Requested: " << requested
+                 << " bytes, or at least minimal: " << minimal
+                 << ". Current quota value is: " << quota
+                 << " while current usage is: " << usage_
+                 << ". The quota is " << (enforced() ? "" : "not ")
+                 << "enforced. "
+                 << ((allocation == 0) ? "Did not allocate any memory."
+                 : "Allocated the minimal value requested.");
+  } else {
+    allocation = std::min(requested, quota - usage_);
+  }
+  usage_ += allocation;
+  return allocation;
+}
+
+template<bool thread_safe>
+void Quota<thread_safe>::Free(size_t amount) {
+  lock_guard_maybe<Mutex> lock(mutex());
+  usage_ -= amount;
+  // threads allocate/free memory concurrently via the same Quota object that 
is
+  // not protected with a mutex (thread_safe == false).
+  if (usage_ > (std::numeric_limits<size_t>::max() - (1 << 28))) {
+    LOG(ERROR) << "Suspiciously big usage_ value: " << usage_
+               << " (could be a result size_t wrapping around below 0, "
+               << "for example as a result of race condition).";
+  }
+}
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::GetQuota() const {
+  lock_guard_maybe<Mutex> lock(mutex());
+  return GetQuotaInternal();
+}
+
+template<bool thread_safe>
+size_t Quota<thread_safe>::GetUsage() const {
+  lock_guard_maybe<Mutex> lock(mutex());
+  return usage_;
+}
+
+template<bool thread_safe>
+void StaticQuota<thread_safe>::SetQuota(const size_t quota) {
+  lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex());
+  quota_ = quota;
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_MEMORY_MEMORY_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/overwrite.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/overwrite.cc 
b/be/src/kudu/util/memory/overwrite.cc
new file mode 100644
index 0000000..cca9227
--- /dev/null
+++ b/be/src/kudu/util/memory/overwrite.cc
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/memory/overwrite.h"
+
+#include "kudu/gutil/strings/stringpiece.h"
+
+#include <string.h>
+#include <glog/logging.h>
+namespace kudu {
+
+void OverwriteWithPattern(char* p, size_t len, StringPiece pattern) {
+  size_t pat_len = pattern.size();
+  CHECK_LT(0, pat_len);
+  size_t rem = len;
+  const char *pat_ptr = pattern.data();
+
+  for (; rem >= pat_len; rem -= pat_len) {
+    memcpy(p, pat_ptr, pat_len);
+    p += pat_len;
+  }
+
+  for (; rem > 0; rem--) {
+    *p++ = *pat_ptr++;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/overwrite.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/overwrite.h 
b/be/src/kudu/util/memory/overwrite.h
new file mode 100644
index 0000000..37c6512
--- /dev/null
+++ b/be/src/kudu/util/memory/overwrite.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_MEMORY_OVERWRITE_H
+#define KUDU_MEMORY_OVERWRITE_H
+
+#include <cstddef>
+
+#include "kudu/gutil/strings/stringpiece.h"
+
+namespace kudu {
+
+// Overwrite 'p' with enough repetitions of 'pattern' to fill 'len'
+// bytes. This is optimized at -O3 even in debug builds, so is
+// reasonably efficient to use.
+void OverwriteWithPattern(char* p, size_t len, StringPiece pattern);
+
+} // namespace kudu
+#endif /* KUDU_MEMORY_OVERWRITE_H */
+

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/metrics-test.cc b/be/src/kudu/util/metrics-test.cc
new file mode 100644
index 0000000..f8776aa
--- /dev/null
+++ b/be/src/kudu/util/metrics-test.cc
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/hdr_histogram.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::unordered_set;
+using std::vector;
+
+DECLARE_int32(metrics_retirement_age_ms);
+
+namespace kudu {
+
+METRIC_DEFINE_entity(test_entity);
+
+class MetricsTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    entity_ = METRIC_ENTITY_test_entity.Instantiate(&registry_, "my-test");
+  }
+
+ protected:
+  MetricRegistry registry_;
+  scoped_refptr<MetricEntity> entity_;
+};
+
+METRIC_DEFINE_counter(test_entity, test_counter, "My Test Counter", 
MetricUnit::kRequests,
+                      "Description of test counter");
+
+TEST_F(MetricsTest, SimpleCounterTest) {
+  scoped_refptr<Counter> requests =
+    new Counter(&METRIC_test_counter);
+  ASSERT_EQ("Description of test counter", 
requests->prototype()->description());
+  ASSERT_EQ(0, requests->value());
+  requests->Increment();
+  ASSERT_EQ(1, requests->value());
+  requests->IncrementBy(2);
+  ASSERT_EQ(3, requests->value());
+}
+
+METRIC_DEFINE_gauge_uint64(test_entity, test_gauge, "Test uint64 Gauge",
+                           MetricUnit::kBytes, "Description of Test Gauge");
+
+TEST_F(MetricsTest, SimpleAtomicGaugeTest) {
+  scoped_refptr<AtomicGauge<uint64_t> > mem_usage =
+    METRIC_test_gauge.Instantiate(entity_, 0);
+  ASSERT_EQ(METRIC_test_gauge.description(), 
mem_usage->prototype()->description());
+  ASSERT_EQ(0, mem_usage->value());
+  mem_usage->IncrementBy(7);
+  ASSERT_EQ(7, mem_usage->value());
+  mem_usage->set_value(5);
+  ASSERT_EQ(5, mem_usage->value());
+}
+
+METRIC_DEFINE_gauge_int64(test_entity, test_func_gauge, "Test Function Gauge",
+                          MetricUnit::kBytes, "Test Gauge 2");
+
+static int64_t MyFunction(int* metric_val) {
+  return (*metric_val)++;
+}
+
+TEST_F(MetricsTest, SimpleFunctionGaugeTest) {
+  int metric_val = 1000;
+  scoped_refptr<FunctionGauge<int64_t> > gauge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+      entity_, Bind(&MyFunction, Unretained(&metric_val)));
+
+  ASSERT_EQ(1000, gauge->value());
+  ASSERT_EQ(1001, gauge->value());
+
+  gauge->DetachToCurrentValue();
+  // After detaching, it should continue to return the same constant value.
+  ASSERT_EQ(1002, gauge->value());
+  ASSERT_EQ(1002, gauge->value());
+
+  // Test resetting to a constant.
+  gauge->DetachToConstant(2);
+  ASSERT_EQ(2, gauge->value());
+}
+
+TEST_F(MetricsTest, AutoDetachToLastValue) {
+  int metric_val = 1000;
+  scoped_refptr<FunctionGauge<int64_t> > gauge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+        entity_, Bind(&MyFunction, Unretained(&metric_val)));
+
+  ASSERT_EQ(1000, gauge->value());
+  ASSERT_EQ(1001, gauge->value());
+  {
+    FunctionGaugeDetacher detacher;
+    gauge->AutoDetachToLastValue(&detacher);
+    ASSERT_EQ(1002, gauge->value());
+    ASSERT_EQ(1003, gauge->value());
+  }
+
+  ASSERT_EQ(1004, gauge->value());
+  ASSERT_EQ(1004, gauge->value());
+}
+
+TEST_F(MetricsTest, AutoDetachToConstant) {
+  int metric_val = 1000;
+  scoped_refptr<FunctionGauge<int64_t> > gauge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+        entity_, Bind(&MyFunction, Unretained(&metric_val)));
+
+  ASSERT_EQ(1000, gauge->value());
+  ASSERT_EQ(1001, gauge->value());
+  {
+    FunctionGaugeDetacher detacher;
+    gauge->AutoDetach(&detacher, 12345);
+    ASSERT_EQ(1002, gauge->value());
+    ASSERT_EQ(1003, gauge->value());
+  }
+
+  ASSERT_EQ(12345, gauge->value());
+}
+
+METRIC_DEFINE_gauge_uint64(test_entity, counter_as_gauge, "Gauge exposed as 
Counter",
+                           MetricUnit::kBytes, "Gauge exposed as Counter",
+                           EXPOSE_AS_COUNTER);
+TEST_F(MetricsTest, TEstExposeGaugeAsCounter) {
+  ASSERT_EQ(MetricType::kCounter, METRIC_counter_as_gauge.type());
+}
+
+METRIC_DEFINE_histogram(test_entity, test_hist, "Test Histogram",
+                        MetricUnit::kMilliseconds, "foo", 1000000, 3);
+
+TEST_F(MetricsTest, SimpleHistogramTest) {
+  scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_);
+  hist->Increment(2);
+  hist->IncrementBy(4, 1);
+  ASSERT_EQ(2, hist->histogram_->MinValue());
+  ASSERT_EQ(3, hist->histogram_->MeanValue());
+  ASSERT_EQ(4, hist->histogram_->MaxValue());
+  ASSERT_EQ(2, hist->histogram_->TotalCount());
+  ASSERT_EQ(6, hist->histogram_->TotalSum());
+  // TODO: Test coverage needs to be improved a lot.
+}
+
+TEST_F(MetricsTest, JsonPrintTest) {
+  scoped_refptr<Counter> test_counter = 
METRIC_test_counter.Instantiate(entity_);
+  test_counter->Increment();
+  entity_->SetAttribute("test_attr", "attr_val");
+
+  // Generate the JSON.
+  std::ostringstream out;
+  JsonWriter writer(&out, JsonWriter::PRETTY);
+  ASSERT_OK(entity_->WriteAsJson(&writer, { "*" }, MetricJsonOptions()));
+
+  // Now parse it back out.
+  JsonReader reader(out.str());
+  ASSERT_OK(reader.Init());
+
+  vector<const rapidjson::Value*> metrics;
+  ASSERT_OK(reader.ExtractObjectArray(reader.root(), "metrics", &metrics));
+  ASSERT_EQ(1, metrics.size());
+  string metric_name;
+  ASSERT_OK(reader.ExtractString(metrics[0], "name", &metric_name));
+  ASSERT_EQ("test_counter", metric_name);
+  int64_t metric_value;
+  ASSERT_OK(reader.ExtractInt64(metrics[0], "value", &metric_value));
+  ASSERT_EQ(1L, metric_value);
+
+  const rapidjson::Value* attributes;
+  ASSERT_OK(reader.ExtractObject(reader.root(), "attributes", &attributes));
+  string attr_value;
+  ASSERT_OK(reader.ExtractString(attributes, "test_attr", &attr_value));
+  ASSERT_EQ("attr_val", attr_value);
+
+  // Verify that metric filtering matches on substrings.
+  out.str("");
+  ASSERT_OK(entity_->WriteAsJson(&writer, { "test count" }, 
MetricJsonOptions()));
+  ASSERT_STR_CONTAINS(METRIC_test_counter.name(), out.str());
+
+  // Verify that, if we filter for a metric that isn't in this entity, we get 
no result.
+  out.str("");
+  ASSERT_OK(entity_->WriteAsJson(&writer, { "not_a_matching_metric" }, 
MetricJsonOptions()));
+  ASSERT_EQ("", out.str());
+
+  // Verify that filtering is case-insensitive.
+  out.str("");
+  ASSERT_OK(entity_->WriteAsJson(&writer, { "mY teST coUNteR" }, 
MetricJsonOptions()));
+  ASSERT_STR_CONTAINS(METRIC_test_counter.name(), out.str());
+}
+
+// Test that metrics are retired when they are no longer referenced.
+TEST_F(MetricsTest, RetirementTest) {
+  FLAGS_metrics_retirement_age_ms = 100;
+
+  const string kMetricName = "foo";
+  scoped_refptr<Counter> counter = METRIC_test_counter.Instantiate(entity_);
+  ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size());
+
+  // Since we hold a reference to the counter, it should not get retired.
+  entity_->RetireOldMetrics();
+  ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size());
+
+  // When we de-ref it, it should not get immediately retired, either, because
+  // we keep retirable metrics around for some amount of time. We try retiring
+  // a number of times to hit all the cases.
+  counter = nullptr;
+  for (int i = 0; i < 3; i++) {
+    entity_->RetireOldMetrics();
+    ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size());
+  }
+
+  // If we wait for longer than the retirement time, and call retire again, 
we'll
+  // actually retire it.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_metrics_retirement_age_ms * 1.5));
+  entity_->RetireOldMetrics();
+  ASSERT_EQ(0, entity_->UnsafeMetricsMapForTests().size());
+}
+
+TEST_F(MetricsTest, TestRetiringEntities) {
+  ASSERT_EQ(1, registry_.num_entities());
+
+  // Drop the reference to our entity.
+  entity_.reset();
+
+  // Retire metrics. Since there is nothing inside our entity, it should
+  // retire immediately (no need to loop).
+  registry_.RetireOldMetrics();
+
+  ASSERT_EQ(0, registry_.num_entities());
+}
+
+// Test that we can mark a metric to never be retired.
+TEST_F(MetricsTest, NeverRetireTest) {
+  entity_->NeverRetire(METRIC_test_hist.Instantiate(entity_));
+  FLAGS_metrics_retirement_age_ms = 0;
+
+  for (int i = 0; i < 3; i++) {
+    entity_->RetireOldMetrics();
+    ASSERT_EQ(1, entity_->UnsafeMetricsMapForTests().size());
+  }
+}
+
+TEST_F(MetricsTest, TestInstantiatingTwice) {
+  // Test that re-instantiating the same entity ID returns the same object.
+  scoped_refptr<MetricEntity> new_entity = 
METRIC_ENTITY_test_entity.Instantiate(
+      &registry_, entity_->id());
+  ASSERT_EQ(new_entity.get(), entity_.get());
+}
+
+TEST_F(MetricsTest, TestInstantiatingDifferentEntities) {
+  scoped_refptr<MetricEntity> new_entity = 
METRIC_ENTITY_test_entity.Instantiate(
+      &registry_, "some other ID");
+  ASSERT_NE(new_entity.get(), entity_.get());
+}
+
+TEST_F(MetricsTest, TestDumpJsonPrototypes) {
+  // Dump the prototype info.
+  std::ostringstream out;
+  JsonWriter w(&out, JsonWriter::PRETTY);
+  MetricPrototypeRegistry::get()->WriteAsJson(&w);
+  string json = out.str();
+
+  // Quick sanity check for one of our metrics defined in this file.
+  const char* expected =
+    "        {\n"
+    "            \"name\": \"test_func_gauge\",\n"
+    "            \"label\": \"Test Function Gauge\",\n"
+    "            \"type\": \"gauge\",\n"
+    "            \"unit\": \"bytes\",\n"
+    "            \"description\": \"Test Gauge 2\",\n"
+    "            \"entity_type\": \"test_entity\"\n"
+    "        }";
+  ASSERT_STR_CONTAINS(json, expected);
+
+  // Parse it.
+  rapidjson::Document d;
+  d.Parse<0>(json.c_str());
+
+  // Ensure that we got a reasonable number of metrics.
+  int num_metrics = d["metrics"].Size();
+  int num_entities = d["entities"].Size();
+  LOG(INFO) << "Parsed " << num_metrics << " metrics and " << num_entities << 
" entities";
+  ASSERT_GT(num_metrics, 5);
+  ASSERT_EQ(num_entities, 2);
+
+  // Spot-check that some metrics were properly registered and that the JSON 
was properly
+  // formed.
+  unordered_set<string> seen_metrics;
+  for (int i = 0; i < d["metrics"].Size(); i++) {
+    InsertOrDie(&seen_metrics, d["metrics"][i]["name"].GetString());
+  }
+  ASSERT_TRUE(ContainsKey(seen_metrics, "threads_started"));
+  ASSERT_TRUE(ContainsKey(seen_metrics, "test_hist"));
+}
+
+TEST_F(MetricsTest, TestDumpOnlyChanged) {
+  auto GetJson = [&](int64_t since_epoch) {
+    MetricJsonOptions opts;
+    opts.only_modified_in_or_after_epoch = since_epoch;
+    std::ostringstream out;
+    JsonWriter writer(&out, JsonWriter::COMPACT);
+    CHECK_OK(entity_->WriteAsJson(&writer, { "*" }, opts));
+    return out.str();
+  };
+
+  scoped_refptr<Counter> test_counter = 
METRIC_test_counter.Instantiate(entity_);
+
+  int64_t epoch_when_modified = Metric::current_epoch();
+  test_counter->Increment();
+
+  // If we pass a "since dirty" epoch from before we incremented it, we should
+  // see the metric.
+  for (int i = 0; i < 2; i++) {
+    ASSERT_STR_CONTAINS(GetJson(epoch_when_modified), 
"{\"name\":\"test_counter\",\"value\":1}");
+    Metric::IncrementEpoch();
+  }
+
+  // If we pass a current epoch, we should see that the metric was not 
modified.
+  int64_t new_epoch = Metric::current_epoch();
+  ASSERT_STR_NOT_CONTAINS(GetJson(new_epoch), "test_counter");
+  // ... until we modify it again.
+  test_counter->Increment();
+  ASSERT_STR_CONTAINS(GetJson(new_epoch), 
"{\"name\":\"test_counter\",\"value\":2}");
+}
+
+
+// Test that 'include_untouched_metrics=false' prevents dumping counters and 
histograms
+// which have never been incremented.
+TEST_F(MetricsTest, TestDontDumpUntouched) {
+  // Instantiate a bunch of metrics.
+  int metric_val = 1000;
+  scoped_refptr<Counter> test_counter = 
METRIC_test_counter.Instantiate(entity_);
+  scoped_refptr<Histogram> hist = METRIC_test_hist.Instantiate(entity_);
+  scoped_refptr<FunctionGauge<int64_t> > function_gauge =
+    METRIC_test_func_gauge.InstantiateFunctionGauge(
+        entity_, Bind(&MyFunction, Unretained(&metric_val)));
+  scoped_refptr<AtomicGauge<uint64_t> > atomic_gauge =
+    METRIC_test_gauge.Instantiate(entity_, 0);
+
+  MetricJsonOptions opts;
+  opts.include_untouched_metrics = false;
+  std::ostringstream out;
+  JsonWriter writer(&out, JsonWriter::COMPACT);
+  CHECK_OK(entity_->WriteAsJson(&writer, { "*" }, opts));
+  // Untouched counters and histograms should not be included.
+  ASSERT_STR_NOT_CONTAINS(out.str(), "test_counter");
+  ASSERT_STR_NOT_CONTAINS(out.str(), "test_hist");
+  // Untouched gauges need to be included, because we don't actually
+  // track whether they have been touched.
+  ASSERT_STR_CONTAINS(out.str(), "test_func_gauge");
+  ASSERT_STR_CONTAINS(out.str(), "test_gauge");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/metrics.cc b/be/src/kudu/util/metrics.cc
new file mode 100644
index 0000000..dc30360
--- /dev/null
+++ b/be/src/kudu/util/metrics.cc
@@ -0,0 +1,746 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/metrics.h"
+
+#include <iostream>
+#include <map>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/hdr_histogram.h"
+#include "kudu/util/histogram.pb.h"
+#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
+
+DEFINE_int32(metrics_retirement_age_ms, 120 * 1000,
+             "The minimum number of milliseconds a metric will be kept for 
after it is "
+             "no longer active. (Advanced option)");
+TAG_FLAG(metrics_retirement_age_ms, runtime);
+TAG_FLAG(metrics_retirement_age_ms, advanced);
+
+// Process/server-wide metrics should go into the 'server' entity.
+// More complex applications will define other entities.
+METRIC_DEFINE_entity(server);
+
+namespace kudu {
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+//
+// MetricUnit
+//
+
+const char* MetricUnit::Name(Type unit) {
+  switch (unit) {
+    case kCacheHits:
+      return "hits";
+    case kCacheQueries:
+      return "queries";
+    case kBytes:
+      return "bytes";
+    case kRequests:
+      return "requests";
+    case kEntries:
+      return "entries";
+    case kRows:
+      return "rows";
+    case kCells:
+      return "cells";
+    case kConnections:
+      return "connections";
+    case kOperations:
+      return "operations";
+    case kProbes:
+      return "probes";
+    case kNanoseconds:
+      return "nanoseconds";
+    case kMicroseconds:
+      return "microseconds";
+    case kMilliseconds:
+      return "milliseconds";
+    case kSeconds:
+      return "seconds";
+    case kThreads:
+      return "threads";
+    case kTransactions:
+      return "transactions";
+    case kUnits:
+      return "units";
+    case kScanners:
+      return "scanners";
+    case kMaintenanceOperations:
+      return "operations";
+    case kBlocks:
+      return "blocks";
+    case kHoles:
+      return "holes";
+    case kLogBlockContainers:
+      return "log block containers";
+    case kTasks:
+      return "tasks";
+    case kMessages:
+      return "messages";
+    case kContextSwitches:
+      return "context switches";
+    case kDataDirectories:
+      return "data directories";
+    case kState:
+      return "state";
+    case kSessions:
+      return "sessions";
+    case kTablets:
+      return "tablets";
+    default:
+      DCHECK(false) << "Unknown unit with type = " << unit;
+      return "UNKNOWN UNIT";
+  }
+}
+
+//
+// MetricType
+//
+
+const char* const MetricType::kGaugeType = "gauge";
+const char* const MetricType::kCounterType = "counter";
+const char* const MetricType::kHistogramType = "histogram";
+const char* MetricType::Name(MetricType::Type type) {
+  switch (type) {
+    case kGauge:
+      return kGaugeType;
+    case kCounter:
+      return kCounterType;
+    case kHistogram:
+      return kHistogramType;
+    default:
+      return "UNKNOWN TYPE";
+  }
+}
+
+//
+// MetricEntityPrototype
+//
+
+MetricEntityPrototype::MetricEntityPrototype(const char* name)
+  : name_(name) {
+  MetricPrototypeRegistry::get()->AddEntity(this);
+}
+
+MetricEntityPrototype::~MetricEntityPrototype() {
+}
+
+scoped_refptr<MetricEntity> MetricEntityPrototype::Instantiate(
+    MetricRegistry* registry,
+    const std::string& id,
+    const MetricEntity::AttributeMap& initial_attrs) const {
+  return registry->FindOrCreateEntity(this, id, initial_attrs);
+}
+
+
+//
+// MetricEntity
+//
+
+MetricEntity::MetricEntity(const MetricEntityPrototype* prototype,
+                           std::string id, AttributeMap attributes)
+    : prototype_(prototype),
+      id_(std::move(id)),
+      attributes_(std::move(attributes)),
+      published_(true) {}
+
+MetricEntity::~MetricEntity() {
+}
+
+void MetricEntity::CheckInstantiation(const MetricPrototype* proto) const {
+  CHECK_STREQ(prototype_->name(), proto->entity_type())
+    << "Metric " << proto->name() << " may not be instantiated entity of type "
+    << prototype_->name() << " (expected: " << proto->entity_type() << ")";
+}
+
+scoped_refptr<Metric> MetricEntity::FindOrNull(const MetricPrototype& 
prototype) const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return FindPtrOrNull(metric_map_, &prototype);
+}
+
+namespace {
+
+bool MatchMetricInList(const string& metric_name,
+                       const vector<string>& match_params) {
+  string metric_name_uc;
+  ToUpperCase(metric_name, &metric_name_uc);
+
+  for (const string& param : match_params) {
+    // Handle wildcard.
+    if (param == "*") return true;
+    // The parameter is a case-insensitive substring match of the metric name.
+    string param_uc;
+    ToUpperCase(param, &param_uc);
+    if (metric_name_uc.find(param_uc) != std::string::npos) {
+      return true;
+    }
+  }
+  return false;
+}
+
+} // anonymous namespace
+
+
+Status MetricEntity::WriteAsJson(JsonWriter* writer,
+                                 const vector<string>& requested_metrics,
+                                 const MetricJsonOptions& opts) const {
+  bool select_all = MatchMetricInList(id(), requested_metrics);
+
+  // We want the keys to be in alphabetical order when printing, so we use an 
ordered map here.
+  typedef std::map<const char*, scoped_refptr<Metric> > OrderedMetricMap;
+  OrderedMetricMap metrics;
+  AttributeMap attrs;
+  {
+    // Snapshot the metrics in this registry (not guaranteed to be a 
consistent snapshot)
+    std::lock_guard<simple_spinlock> l(lock_);
+    attrs = attributes_;
+    for (const MetricMap::value_type& val : metric_map_) {
+      const MetricPrototype* prototype = val.first;
+      const scoped_refptr<Metric>& metric = val.second;
+
+      if (select_all || MatchMetricInList(prototype->name(), 
requested_metrics)) {
+        InsertOrDie(&metrics, prototype->name(), metric);
+      }
+    }
+  }
+
+  // If we had a filter, and we didn't either match this entity or any metrics 
inside
+  // it, don't print the entity at all.
+  if (!requested_metrics.empty() && !select_all && metrics.empty()) {
+    return Status::OK();
+  }
+
+  writer->StartObject();
+
+  writer->String("type");
+  writer->String(prototype_->name());
+
+  writer->String("id");
+  writer->String(id_);
+
+  if (opts.include_entity_attributes) {
+    writer->String("attributes");
+    writer->StartObject();
+    for (const AttributeMap::value_type& val : attrs) {
+      writer->String(val.first);
+      writer->String(val.second);
+    }
+    writer->EndObject();
+  }
+
+  writer->String("metrics");
+  writer->StartArray();
+  for (OrderedMetricMap::value_type& val : metrics) {
+    const auto& m = val.second;
+    if (m->ModifiedInOrAfterEpoch(opts.only_modified_in_or_after_epoch)) {
+      if (!opts.include_untouched_metrics && m->IsUntouched()) {
+        continue;
+      }
+      WARN_NOT_OK(m->WriteAsJson(writer, opts),
+                  strings::Substitute("Failed to write $0 as JSON", 
val.first));
+    }
+  }
+  writer->EndArray();
+
+  writer->EndObject();
+
+  return Status::OK();
+}
+
+void MetricEntity::RetireOldMetrics() {
+  MonoTime now(MonoTime::Now());
+
+  std::lock_guard<simple_spinlock> l(lock_);
+  for (auto it = metric_map_.begin(); it != metric_map_.end();) {
+    const scoped_refptr<Metric>& metric = it->second;
+
+    if (PREDICT_TRUE(!metric->HasOneRef() && published_)) {
+      // The metric is still in use. Note that, in the case of 
"NeverRetire()", the metric
+      // will have a ref-count of 2 because it is reffed by the 
'never_retire_metrics_'
+      // collection.
+
+      // Ensure that it is not marked for later retirement (this could happen 
in the case
+      // that a metric is un-reffed and then re-reffed later by looking it up 
from the
+      // registry).
+      metric->retire_time_ = MonoTime();
+      ++it;
+      continue;
+    }
+
+    if (!metric->retire_time_.Initialized()) {
+      VLOG(3) << "Metric " << it->first << " has become un-referenced or 
unpublished. "
+              << "Will retire after the retention interval";
+      // This is the first time we've seen this metric as retirable.
+      metric->retire_time_ =
+          now + MonoDelta::FromMilliseconds(FLAGS_metrics_retirement_age_ms);
+      ++it;
+      continue;
+    }
+
+    // If we've already seen this metric in a previous scan, check if it's
+    // time to retire it yet.
+    if (now < metric->retire_time_) {
+      VLOG(3) << "Metric " << it->first << " is un-referenced, but still 
within "
+              << "the retention interval";
+      ++it;
+      continue;
+    }
+
+
+    VLOG(2) << "Retiring metric " << it->first;
+    metric_map_.erase(it++);
+  }
+}
+
+void MetricEntity::NeverRetire(const scoped_refptr<Metric>& metric) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  never_retire_metrics_.push_back(metric);
+}
+
+void MetricEntity::SetAttributes(const AttributeMap& attrs) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  attributes_ = attrs;
+}
+
+void MetricEntity::SetAttribute(const string& key, const string& val) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  attributes_[key] = val;
+}
+
+//
+// MetricRegistry
+//
+
+MetricRegistry::MetricRegistry() {
+}
+
+MetricRegistry::~MetricRegistry() {
+}
+
+Status MetricRegistry::WriteAsJson(JsonWriter* writer,
+                                   const vector<string>& requested_metrics,
+                                   const MetricJsonOptions& opts) const {
+  EntityMap entities;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    entities = entities_;
+  }
+
+  writer->StartArray();
+  for (const auto& e : entities) {
+    WARN_NOT_OK(e.second->WriteAsJson(writer, requested_metrics, opts),
+                Substitute("Failed to write entity $0 as JSON", 
e.second->id()));
+  }
+  writer->EndArray();
+
+  // Rather than having a thread poll metrics periodically to retire old ones,
+  // we'll just retire them here. The only downside is that, if no one is 
polling
+  // metrics, we may end up leaving them around indefinitely; however, metrics 
are
+  // small, and one might consider it a feature: if monitoring stops polling 
for
+  // metrics, we should keep them around until the next poll.
+  entities.clear(); // necessary to deref metrics we just dumped before doing 
retirement scan.
+  const_cast<MetricRegistry*>(this)->RetireOldMetrics();
+  return Status::OK();
+}
+
+void MetricRegistry::RetireOldMetrics() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  for (auto it = entities_.begin(); it != entities_.end();) {
+    it->second->RetireOldMetrics();
+
+    if (it->second->num_metrics() == 0 &&
+        (it->second->HasOneRef() || !it->second->published())) {
+      // This entity has no metrics and either has no more external references 
or has
+      // been marked as unpublished, so we can remove it.
+      // Unlike retiring the metrics themselves, we don't wait for any timeout
+      // to retire them -- we assume that that timed retention has been 
satisfied
+      // by holding onto the metrics inside the entity.
+      entities_.erase(it++);
+    } else {
+      ++it;
+    }
+  }
+}
+
+//
+// MetricPrototypeRegistry
+//
+MetricPrototypeRegistry* MetricPrototypeRegistry::get() {
+  return Singleton<MetricPrototypeRegistry>::get();
+}
+
+void MetricPrototypeRegistry::AddMetric(const MetricPrototype* prototype) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  metrics_.push_back(prototype);
+}
+
+void MetricPrototypeRegistry::AddEntity(const MetricEntityPrototype* 
prototype) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  entities_.push_back(prototype);
+}
+
+void MetricPrototypeRegistry::WriteAsJson(JsonWriter* writer) const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  MetricJsonOptions opts;
+  opts.include_schema_info = true;
+  writer->StartObject();
+
+  // Dump metric prototypes.
+  writer->String("metrics");
+  writer->StartArray();
+  for (const MetricPrototype* p : metrics_) {
+    writer->StartObject();
+    p->WriteFields(writer, opts);
+    writer->String("entity_type");
+    writer->String(p->entity_type());
+    writer->EndObject();
+  }
+  writer->EndArray();
+
+  // Dump entity prototypes.
+  writer->String("entities");
+  writer->StartArray();
+  for (const MetricEntityPrototype* p : entities_) {
+    writer->StartObject();
+    writer->String("name");
+    writer->String(p->name());
+    writer->EndObject();
+  }
+  writer->EndArray();
+
+  writer->EndObject();
+}
+
+void MetricPrototypeRegistry::WriteAsJson() const {
+  std::ostringstream s;
+  JsonWriter w(&s, JsonWriter::PRETTY);
+  WriteAsJson(&w);
+  std::cout << s.str() << std::endl;
+}
+
+//
+// MetricPrototype
+//
+MetricPrototype::MetricPrototype(CtorArgs args) : args_(args) {
+  MetricPrototypeRegistry::get()->AddMetric(this);
+}
+
+void MetricPrototype::WriteFields(JsonWriter* writer,
+                                  const MetricJsonOptions& opts) const {
+  writer->String("name");
+  writer->String(name());
+
+  if (opts.include_schema_info) {
+    writer->String("label");
+    writer->String(label());
+
+    writer->String("type");
+    writer->String(MetricType::Name(type()));
+
+    writer->String("unit");
+    writer->String(MetricUnit::Name(unit()));
+
+    writer->String("description");
+    writer->String(description());
+  }
+}
+
+//
+// FunctionGaugeDetacher
+//
+
+FunctionGaugeDetacher::FunctionGaugeDetacher() {
+}
+
+FunctionGaugeDetacher::~FunctionGaugeDetacher() {
+  for (const Closure& c : callbacks_) {
+    c.Run();
+  }
+}
+
+scoped_refptr<MetricEntity> MetricRegistry::FindOrCreateEntity(
+    const MetricEntityPrototype* prototype,
+    const std::string& id,
+    const MetricEntity::AttributeMap& initial_attributes) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  scoped_refptr<MetricEntity> e = FindPtrOrNull(entities_, id);
+  if (!e) {
+    e = new MetricEntity(prototype, id, initial_attributes);
+    InsertOrDie(&entities_, id, e);
+  } else if (!e->published()) {
+    e = new MetricEntity(prototype, id, initial_attributes);
+    entities_[id] = e;
+  } else {
+    e->SetAttributes(initial_attributes);
+  }
+  return e;
+}
+
+//
+// Metric
+//
+
+std::atomic<int64_t> Metric::g_epoch_;
+
+Metric::Metric(const MetricPrototype* prototype)
+    : prototype_(prototype),
+      m_epoch_(current_epoch()) {
+}
+
+Metric::~Metric() {
+}
+
+void Metric::IncrementEpoch() {
+  g_epoch_++;
+}
+
+void Metric::UpdateModificationEpochSlowPath() {
+  int64_t new_epoch, old_epoch;
+  // CAS loop to ensure that we never transition a metric's epoch backwards
+  // even if multiple threads race to update it.
+  do {
+    old_epoch = m_epoch_;
+    new_epoch = g_epoch_;
+  } while (old_epoch < new_epoch &&
+           !m_epoch_.compare_exchange_weak(old_epoch, new_epoch));
+}
+
+//
+// Gauge
+//
+
+Status Gauge::WriteAsJson(JsonWriter* writer,
+                          const MetricJsonOptions& opts) const {
+  writer->StartObject();
+
+  prototype_->WriteFields(writer, opts);
+
+  writer->String("value");
+  WriteValue(writer);
+
+  writer->EndObject();
+  return Status::OK();
+}
+
+//
+// StringGauge
+//
+
+StringGauge::StringGauge(const GaugePrototype<string>* proto,
+                         string initial_value)
+    : Gauge(proto), value_(std::move(initial_value)) {}
+
+std::string StringGauge::value() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return value_;
+}
+
+void StringGauge::set_value(const std::string& value) {
+  UpdateModificationEpoch();
+  std::lock_guard<simple_spinlock> l(lock_);
+  value_ = value;
+}
+
+void StringGauge::WriteValue(JsonWriter* writer) const {
+  writer->String(value());
+}
+
+//
+// Counter
+//
+// This implementation is optimized by using a striped counter. See LongAdder 
for details.
+
+scoped_refptr<Counter> CounterPrototype::Instantiate(const 
scoped_refptr<MetricEntity>& entity) {
+  return entity->FindOrCreateCounter(this);
+}
+
+Counter::Counter(const CounterPrototype* proto) : Metric(proto) {
+}
+
+int64_t Counter::value() const {
+  return value_.Value();
+}
+
+void Counter::Increment() {
+  IncrementBy(1);
+}
+
+void Counter::IncrementBy(int64_t amount) {
+  UpdateModificationEpoch();
+  value_.IncrementBy(amount);
+}
+
+Status Counter::WriteAsJson(JsonWriter* writer,
+                            const MetricJsonOptions& opts) const {
+  writer->StartObject();
+
+  prototype_->WriteFields(writer, opts);
+
+  writer->String("value");
+  writer->Int64(value());
+
+  writer->EndObject();
+  return Status::OK();
+}
+
+/////////////////////////////////////////////////
+// HistogramPrototype
+/////////////////////////////////////////////////
+
+HistogramPrototype::HistogramPrototype(const MetricPrototype::CtorArgs& args,
+                                       uint64_t max_trackable_value, int 
num_sig_digits)
+  : MetricPrototype(args),
+    max_trackable_value_(max_trackable_value),
+    num_sig_digits_(num_sig_digits) {
+  // Better to crash at definition time that at instantiation time.
+  CHECK(HdrHistogram::IsValidHighestTrackableValue(max_trackable_value))
+      << Substitute("Invalid max trackable value on histogram $0: $1",
+                    args.name_, max_trackable_value);
+  CHECK(HdrHistogram::IsValidNumSignificantDigits(num_sig_digits))
+      << Substitute("Invalid number of significant digits on histogram $0: $1",
+                    args.name_, num_sig_digits);
+}
+
+scoped_refptr<Histogram> HistogramPrototype::Instantiate(
+    const scoped_refptr<MetricEntity>& entity) {
+  return entity->FindOrCreateHistogram(this);
+}
+
+/////////////////////////////////////////////////
+// Histogram
+/////////////////////////////////////////////////
+
+Histogram::Histogram(const HistogramPrototype* proto)
+  : Metric(proto),
+    histogram_(new HdrHistogram(proto->max_trackable_value(), 
proto->num_sig_digits())) {
+}
+
+void Histogram::Increment(int64_t value) {
+  UpdateModificationEpoch();
+  histogram_->Increment(value);
+}
+
+void Histogram::IncrementBy(int64_t value, int64_t amount) {
+  UpdateModificationEpoch();
+  histogram_->IncrementBy(value, amount);
+}
+
+Status Histogram::WriteAsJson(JsonWriter* writer,
+                              const MetricJsonOptions& opts) const {
+
+  HistogramSnapshotPB snapshot;
+  RETURN_NOT_OK(GetHistogramSnapshotPB(&snapshot, opts));
+  writer->Protobuf(snapshot);
+  return Status::OK();
+}
+
+Status Histogram::GetHistogramSnapshotPB(HistogramSnapshotPB* snapshot_pb,
+                                         const MetricJsonOptions& opts) const {
+  snapshot_pb->set_name(prototype_->name());
+  if (opts.include_schema_info) {
+    snapshot_pb->set_type(MetricType::Name(prototype_->type()));
+    snapshot_pb->set_label(prototype_->label());
+    snapshot_pb->set_unit(MetricUnit::Name(prototype_->unit()));
+    snapshot_pb->set_description(prototype_->description());
+    
snapshot_pb->set_max_trackable_value(histogram_->highest_trackable_value());
+    
snapshot_pb->set_num_significant_digits(histogram_->num_significant_digits());
+  }
+  // Fast-path for a reasonably common case of an empty histogram. This occurs
+  // when a histogram is tracking some information about a feature not in
+  // use, for example.
+  if (histogram_->TotalCount() == 0) {
+    snapshot_pb->set_total_count(0);
+    snapshot_pb->set_total_sum(0);
+    snapshot_pb->set_min(0);
+    snapshot_pb->set_mean(0);
+    snapshot_pb->set_percentile_75(0);
+    snapshot_pb->set_percentile_95(0);
+    snapshot_pb->set_percentile_99(0);
+    snapshot_pb->set_percentile_99_9(0);
+    snapshot_pb->set_percentile_99_99(0);
+    snapshot_pb->set_max(0);
+  } else {
+    HdrHistogram snapshot(*histogram_);
+    snapshot_pb->set_total_count(snapshot.TotalCount());
+    snapshot_pb->set_total_sum(snapshot.TotalSum());
+    snapshot_pb->set_min(snapshot.MinValue());
+    snapshot_pb->set_mean(snapshot.MeanValue());
+    snapshot_pb->set_percentile_75(snapshot.ValueAtPercentile(75));
+    snapshot_pb->set_percentile_95(snapshot.ValueAtPercentile(95));
+    snapshot_pb->set_percentile_99(snapshot.ValueAtPercentile(99));
+    snapshot_pb->set_percentile_99_9(snapshot.ValueAtPercentile(99.9));
+    snapshot_pb->set_percentile_99_99(snapshot.ValueAtPercentile(99.99));
+    snapshot_pb->set_max(snapshot.MaxValue());
+
+    if (opts.include_raw_histograms) {
+      RecordedValuesIterator iter(&snapshot);
+      while (iter.HasNext()) {
+        HistogramIterationValue value;
+        RETURN_NOT_OK(iter.Next(&value));
+        snapshot_pb->add_values(value.value_iterated_to);
+        snapshot_pb->add_counts(value.count_at_value_iterated_to);
+      }
+    }
+  }
+  return Status::OK();
+}
+
+uint64_t Histogram::CountInBucketForValueForTests(uint64_t value) const {
+  return histogram_->CountInBucketForValue(value);
+}
+
+uint64_t Histogram::TotalCount() const {
+  return histogram_->TotalCount();
+}
+
+uint64_t Histogram::MinValueForTests() const {
+  return histogram_->MinValue();
+}
+
+uint64_t Histogram::MaxValueForTests() const {
+  return histogram_->MaxValue();
+}
+double Histogram::MeanValueForTests() const {
+  return histogram_->MeanValue();
+}
+
+ScopedLatencyMetric::ScopedLatencyMetric(Histogram* latency_hist)
+  : latency_hist_(latency_hist) {
+  if (latency_hist_) {
+    time_started_ = MonoTime::Now();
+  }
+}
+
+ScopedLatencyMetric::~ScopedLatencyMetric() {
+  if (latency_hist_ != nullptr) {
+    MonoTime time_now = MonoTime::Now();
+    latency_hist_->Increment((time_now - time_started_).ToMicroseconds());
+  }
+}
+
+} // namespace kudu

Reply via email to