http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/storage/CompactKeySeparateChainingHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CompactKeySeparateChainingHashTable.hpp 
b/storage/CompactKeySeparateChainingHashTable.hpp
new file mode 100644
index 0000000..0d057e4
--- /dev/null
+++ b/storage/CompactKeySeparateChainingHashTable.hpp
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_COMPACT_KEY_SEPARATE_CHAINING_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_COMPACT_KEY_SEPARATE_CHAINING_HASH_TABLE_HPP_
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "cli/Flags.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageConstants.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "utility/Macros.hpp"
+#include "utility/Range.hpp"
+#include "utility/ScopedArray.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class AggregationHandle;
+class ColumnVectorsValueAccessor;
+class StorageManager;
+
+class CompactKeySeparateChainingHashTable : public 
AggregationStateHashTableBase {
+ public:
+  CompactKeySeparateChainingHashTable(
+      const std::vector<const Type*> &key_types,
+      const std::size_t num_entries,
+      const std::vector<AggregationHandle *> &handles,
+      StorageManager *storage_manager);
+
+  bool upsertValueAccessorCompositeKey(
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      const ValueAccessorMultiplexer &accessor_mux) override;
+
+  void destroyPayload() override {}
+
+  HashTableImplType getImplType() const override {
+    return HashTableImplType::kCompactKeySeparateChaining;
+  }
+
+  std::size_t getMemoryConsumptionBytes() const override {
+    return kSlotDataSize * num_slots_ + kKeyBucketDataSize * num_key_buckets_;
+  }
+
+  inline std::size_t getNumInitializationPartitions() const {
+    return slots_init_splitter_->getNumPartitions();
+  }
+
+  inline std::size_t getNumFinalizationPartitions() const {
+    if (final_splitter_ == nullptr) {
+      final_splitter_ = std::make_unique<RangeSplitter>(
+          RangeSplitter::CreateWithPartitionLength(
+              0, buckets_allocated_.load(std::memory_order_relaxed),
+              kFinalMinPartitionLength, FLAGS_num_workers * 2));
+    }
+    return final_splitter_->getNumPartitions();
+  }
+
+  void initialize(const std::size_t partition_id) {
+    const Range slots_range = slots_init_splitter_->getPartition(partition_id);
+    std::memset(slots_.get() + slots_range.begin(),
+                0,
+                slots_range.size() * kSlotDataSize);
+
+    const Range key_buckets_range =
+        key_buckets_init_splitter_->getPartition(partition_id);
+    std::memset(key_buckets_.get() + key_buckets_range.begin(),
+                0,
+                key_buckets_range.size() * kKeyBucketDataSize);
+  }
+
+  void finalizeKeys(const std::size_t partition_id,
+                    ColumnVectorsValueAccessor *output) const;
+
+ private:
+  inline static std::size_t CacheLineAlignedBytes(const std::size_t 
actual_bytes) {
+    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * 
kCacheLineBytes;
+  }
+
+  inline static std::size_t CalculateNumInitializationPartitions(
+      const std::size_t memory_size) {
+    // Set initialization memory block size as 4MB.
+    constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u;
+
+    // At least 1 partition, at most 80 partitions.
+    return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
+  }
+
+  inline static std::size_t CalculateNumFinalizationPartitions(
+      const std::size_t num_entries) {
+    // Set finalization segment size as 4096 entries.
+    constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+    // At least 1 partition, at most 80 partitions.
+    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+  }
+
+  using KeyCode = std::uint64_t;
+  using BucketIndex = std::uint32_t;
+
+  inline BucketIndex locateBucketInternal(const KeyCode key_code);
+
+  template <typename ValueAccessorT>
+  inline void constructCompactKeyCodeComponent(const std::size_t num_tuples,
+                                               const std::size_t offset,
+                                               const std::size_t key_size,
+                                               ValueAccessorT *accessor,
+                                               const attribute_id attr,
+                                               KeyCode *key_codes);
+
+  static constexpr std::size_t kInitMinPartitionLength = 1024uL * 256uL;
+  static constexpr std::size_t kFinalMinPartitionLength = 1024uL * 4uL;
+
+  struct KeyBucket {
+    KeyCode key_code;
+    std::atomic<BucketIndex> next;
+  };
+
+  static constexpr std::size_t kSlotDataSize = 
sizeof(std::atomic<BucketIndex>);
+  static constexpr std::size_t kKeyBucketDataSize = sizeof(KeyBucket);
+  static constexpr BucketIndex kExclusiveState = 
std::numeric_limits<BucketIndex>::max();
+
+  const std::vector<const Type*> key_types_;
+  std::vector<std::size_t> key_sizes_;
+
+  ScopedArray<std::atomic<BucketIndex>> slots_;
+  ScopedArray<KeyBucket> key_buckets_;
+
+  std::size_t num_slots_;
+  std::size_t num_key_buckets_;
+  std::atomic<std::size_t> buckets_allocated_;
+
+  std::unique_ptr<RangeSplitter> slots_init_splitter_;
+  std::unique_ptr<RangeSplitter> key_buckets_init_splitter_;
+  mutable std::unique_ptr<RangeSplitter> final_splitter_;
+
+  DISALLOW_COPY_AND_ASSIGN(CompactKeySeparateChainingHashTable);
+};
+
+// ----------------------------------------------------------------------------
+// Implementations of class methods follow.
+
+inline CompactKeySeparateChainingHashTable::BucketIndex
+    CompactKeySeparateChainingHashTable::locateBucketInternal(const KeyCode 
key_code) {
+  std::atomic<BucketIndex> *pending_chain = &slots_[key_code % num_slots_];
+
+  for (;;) {
+    BucketIndex existing_chain = 0;
+
+    // Check if current node is the end of the chain.
+    if (pending_chain->compare_exchange_strong(existing_chain,
+                                               kExclusiveState,
+                                               std::memory_order_acq_rel)) {
+      const BucketIndex bucket_index =
+          buckets_allocated_.fetch_add(1, std::memory_order_relaxed);
+
+      // TODO(jianqiao): Resize.
+      if (bucket_index > num_key_buckets_) {
+        LOG(FATAL) << "Need resize, not handled";
+      }
+
+      // Store key code into key bucket.
+      key_buckets_[bucket_index].key_code = key_code;
+
+      // Update the chaing pointer to point to the new node.
+      pending_chain->store(bucket_index + 1, std::memory_order_release);
+
+      return bucket_index;
+    }
+
+    // Spin until the pointer is available.
+    while (existing_chain == kExclusiveState) {
+      existing_chain = pending_chain->load(std::memory_order_acquire);
+    }
+
+    if (existing_chain == 0) {
+      // Other thread had to roll back, so try again.
+      continue;
+    }
+
+    const BucketIndex bucket_index = existing_chain - 1;
+    KeyBucket &key_bucket = key_buckets_[bucket_index];
+    if (key_bucket.key_code == key_code) {
+      return bucket_index;
+    } else {
+      pending_chain = &key_bucket.next;
+    }
+  }
+}
+
+template <typename ValueAccessorT>
+inline void CompactKeySeparateChainingHashTable
+    ::constructCompactKeyCodeComponent(const std::size_t num_tuples,
+                                       const std::size_t offset,
+                                       const std::size_t key_size,
+                                       ValueAccessorT *accessor,
+                                       const attribute_id attr,
+                                       KeyCode *key_codes) {
+  accessor->beginIteration();
+  for (std::size_t i = 0; i < num_tuples; ++i) {
+    accessor->next();
+    std::memcpy(reinterpret_cast<char*>(key_codes + i) + offset,
+                accessor->template getUntypedValue<false>(attr),
+                key_size);
+  }
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_COMPACT_KEY_SEPARATE_CHAINING_HASH_TABLE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/storage/Flags.hpp
----------------------------------------------------------------------
diff --git a/storage/Flags.hpp b/storage/Flags.hpp
index 1d5527c..87f7da4 100644
--- a/storage/Flags.hpp
+++ b/storage/Flags.hpp
@@ -41,7 +41,6 @@ DECLARE_bool(use_hdfs);
 DECLARE_string(hdfs_namenode_host);
 DECLARE_int32(hdfs_namenode_port);
 DECLARE_int32(hdfs_num_replications);
-
 #endif
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index d489b9f..40c8e32 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -23,16 +23,11 @@ import "types/Type.proto";
 
 enum HashTableImplType {
   COLLISION_FREE_VECTOR = 0;
-  LINEAR_OPEN_ADDRESSING = 1;
-  SEPARATE_CHAINING = 2;
-  SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
-  THREAD_PRIVATE_COMPACT_KEY = 4;
-}
-
-message CollisionFreeVectorInfo {
-  required uint64 memory_size = 1;
-  required uint64 num_init_partitions = 2;
-  repeated uint64 state_offsets = 3;
+  COMPACT_KEY_SEPARATE_CHAINING = 1;
+  LINEAR_OPEN_ADDRESSING = 2;
+  SEPARATE_CHAINING = 3;
+  SIMPLE_SCALAR_SEPARATE_CHAINING = 4;
+  THREAD_PRIVATE_COMPACT_KEY = 5;
 }
 
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 4d9310c..2e89554 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -42,6 +42,7 @@ class ValueAccessor;
  **/
 enum class HashTableImplType {
   kCollisionFreeVector,
+  kCompactKeySeparateChaining,
   kLinearOpenAddressing,
   kSeparateChaining,
   kSimpleScalarSeparateChaining,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 732920f..df71100 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "storage/CollisionFreeVectorTable.hpp"
+#include "storage/CompactKeySeparateChainingHashTable.hpp"
 #include "storage/HashTable.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTable.pb.h"
@@ -118,6 +119,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
   switch (proto_type) {
     case serialization::HashTableImplType::COLLISION_FREE_VECTOR:
       return HashTableImplType::kCollisionFreeVector;
+    case serialization::HashTableImplType::COMPACT_KEY_SEPARATE_CHAINING:
+      return HashTableImplType::kCompactKeySeparateChaining;
     case serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING:
       return HashTableImplType::kLinearOpenAddressing;
     case serialization::HashTableImplType::SEPARATE_CHAINING:
@@ -356,14 +359,6 @@ class AggregationStateHashTableFactory {
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the hash table's contents). Forwarded as-is to 
the
    *        hash table constructor.
-   * @param num_partitions The number of partitions of this aggregation state
-   *        hash table.
-   * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
-   *        the memory size.
-   * @param collision_free_vector_num_init_partitions For
-   *        CollisionFreeVectorTable, the number of partitions to initialize.
-   * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
-   *        the offsets for each state.
    * @return A new aggregation state hash table.
    **/
   static AggregationStateHashTableBase* CreateResizable(
@@ -371,18 +366,15 @@ class AggregationStateHashTableFactory {
       const std::vector<const Type*> &key_types,
       const std::size_t num_entries,
       const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager,
-      const std::size_t num_partitions = 1u,
-      const std::size_t collision_free_vector_memory_size = 0,
-      const std::size_t collision_free_vector_num_init_partitions = 0,
-      const std::vector<std::size_t> &collision_free_vector_state_offsets = 
std::vector<std::size_t>()) {
+      StorageManager *storage_manager) {
     switch (hash_table_type) {
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
-            key_types.front(), num_entries, collision_free_vector_memory_size,
-            collision_free_vector_num_init_partitions, num_partitions,
-            collision_free_vector_state_offsets, handles, storage_manager);
+            key_types.front(), num_entries, handles, storage_manager);
+      case HashTableImplType::kCompactKeySeparateChaining:
+        return new CompactKeySeparateChainingHashTable(
+            key_types, num_entries, handles, storage_manager);
       case HashTableImplType::kSeparateChaining:
         return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index c5c9dd8..c78ec59 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -194,6 +194,8 @@ add_library(quickstep_utility_PrimeNumber PrimeNumber.cpp 
PrimeNumber.hpp)
 add_library(quickstep_utility_PtrList ../empty_src.cpp PtrList.hpp)
 add_library(quickstep_utility_PtrMap ../empty_src.cpp PtrMap.hpp)
 add_library(quickstep_utility_PtrVector ../empty_src.cpp PtrVector.hpp)
+add_library(quickstep_utility_Range ../empty_src.cpp Range.hpp)
+add_library(quickstep_utility_ScopedArray ../empty_src.cpp ScopedArray.hpp)
 add_library(quickstep_utility_ScopedBuffer ../empty_src.cpp ScopedBuffer.hpp)
 add_library(quickstep_utility_ScopedDeleter ../empty_src.cpp ScopedDeleter.hpp)
 add_library(quickstep_utility_ScopedReassignment ../empty_src.cpp 
ScopedReassignment.hpp)
@@ -308,6 +310,12 @@ target_link_libraries(quickstep_utility_PtrMap
 target_link_libraries(quickstep_utility_PtrVector
                       glog
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_Range
+                      glog
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_utility_ScopedArray
+                      quickstep_utility_ScopedBuffer
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_ScopedBuffer
                       glog
                       quickstep_utility_Alignment
@@ -380,6 +388,8 @@ target_link_libraries(quickstep_utility
                       quickstep_utility_PtrList
                       quickstep_utility_PtrMap
                       quickstep_utility_PtrVector
+                      quickstep_utility_Range
+                      quickstep_utility_ScopedArray
                       quickstep_utility_ScopedBuffer
                       quickstep_utility_ScopedDeleter
                       quickstep_utility_ScopedReassignment

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/utility/ExecutionDAGVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/ExecutionDAGVisualizer.cpp 
b/utility/ExecutionDAGVisualizer.cpp
index f009a72..bb8fe8f 100644
--- a/utility/ExecutionDAGVisualizer.cpp
+++ b/utility/ExecutionDAGVisualizer.cpp
@@ -234,6 +234,9 @@ void ExecutionDAGVisualizer::bindProfilingStats(
         std::max(time_end[relop_index], workorder_end_time);
     time_elapsed[relop_index] += (workorder_end_time - workorder_start_time);
 
+    if (workorders_count.find(relop_index) == workorders_count.end()) {
+      workorders_count[relop_index] = 0;
+    }
     ++workorders_count[relop_index];
     if (mean_time_per_workorder.find(relop_index) ==
         mean_time_per_workorder.end()) {
@@ -289,6 +292,7 @@ void ExecutionDAGVisualizer::bindProfilingStats(
       node_info.labels.emplace_back(
           "effective concurrency: " + FormatDigits(concurrency, 2));
 
+      DCHECK(workorders_count.find(node_index) != workorders_count.end());
       const std::size_t workorders_count_for_node = 
workorders_count[node_index];
       if (workorders_count_for_node > 0) {
         mean_time_per_workorder[node_index] =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/utility/Range.hpp
----------------------------------------------------------------------
diff --git a/utility/Range.hpp b/utility/Range.hpp
new file mode 100644
index 0000000..4fb55be
--- /dev/null
+++ b/utility/Range.hpp
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_RANGE_HPP_
+#define QUICKSTEP_UTILITY_RANGE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <limits>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class Range {
+ public:
+  Range(const std::size_t begin, const std::size_t end)
+      : begin_(begin), end_(end) {
+    DCHECK_LE(begin_, end_);
+  }
+
+  Range(const Range &range)
+      : begin_(range.begin_), end_(range.end_) {
+    DCHECK_LE(begin_, end_);
+  }
+
+  inline std::size_t begin() const {
+    return begin_;
+  }
+
+  inline std::size_t end() const {
+    return end_;
+  }
+
+  inline std::size_t size() const {
+    return end_ - begin_;
+  }
+
+ private:
+  const std::size_t begin_;
+  const std::size_t end_;
+};
+
+class RangeSplitter {
+ public:
+  static RangeSplitter CreateWithPartitionLength(
+      const std::size_t begin,
+      const std::size_t end,
+      const std::size_t min_partition_length,
+      const std::size_t max_num_partitions = kMaxNumPartitions) {
+    DCHECK_LE(begin, end);
+    DCHECK_GT(min_partition_length, 0u);
+    DCHECK_GT(max_num_partitions, 0u);
+
+    const std::size_t range_length = end - begin;
+    const std::size_t est_num_partitions = range_length / min_partition_length;
+
+    const std::size_t num_partitions =
+        std::max(1uL, std::min(est_num_partitions, max_num_partitions));
+    const std::size_t partition_length = range_length / num_partitions;
+    return RangeSplitter(begin, end, num_partitions, partition_length);
+  }
+
+  static RangeSplitter CreateWithPartitionLength(
+      const Range &range,
+      const std::size_t min_partition_length,
+      const std::size_t max_num_partitions = kMaxNumPartitions) {
+    return CreateWithPartitionLength(
+        range.begin(), range.end(), min_partition_length, max_num_partitions);
+  }
+
+  static RangeSplitter CreateWithMinMaxPartitionLength(
+      const std::size_t begin,
+      const std::size_t end,
+      const std::size_t min_partition_length,
+      const std::size_t max_partition_length,
+      const std::size_t ept_num_partitions) {
+    DCHECK_LE(begin, end);
+    DCHECK_LE(min_partition_length, max_partition_length);
+    DCHECK_GT(min_partition_length, 0u);
+    DCHECK_GT(max_partition_length, 0u);
+
+    const std::size_t range_length = end - begin;
+    const std::size_t ept_partition_length = range_length / ept_num_partitions;
+
+    std::size_t partition_length;
+    if (ept_partition_length < min_partition_length) {
+      partition_length = min_partition_length;
+    } else if (ept_partition_length > max_partition_length) {
+      partition_length = max_partition_length;
+    } else {
+      partition_length = ept_partition_length;
+    }
+
+    const std::size_t num_partitions =
+        std::max(1uL, range_length / partition_length);
+    return RangeSplitter(begin, end, num_partitions, partition_length);
+  }
+
+  static RangeSplitter CreateWithMinMaxPartitionLength(
+      const Range &range,
+      const std::size_t min_partition_length,
+      const std::size_t max_partition_length,
+      const std::size_t ept_num_partitions) {
+    return CreateWithMinMaxPartitionLength(
+        range.begin(), range.end(),
+        min_partition_length, max_partition_length,
+        ept_num_partitions);
+  }
+
+  static RangeSplitter CreateWithNumPartitions(
+      const std::size_t begin,
+      const std::size_t end,
+      const std::size_t num_partitions) {
+    DCHECK_LE(begin, end);
+    DCHECK_GT(num_partitions, 0u);
+
+    const std::size_t partition_length = (end - begin) / num_partitions;
+    return RangeSplitter(begin, end, num_partitions, partition_length);
+  }
+
+  static RangeSplitter CreateWithNumPartitions(
+      const Range &range,
+      const std::size_t num_partitions) {
+    return CreateWithNumPartitions(range.begin(), range.end(), num_partitions);
+  }
+
+  RangeSplitter(const RangeSplitter &other)
+      : begin_(other.begin_), end_(other.end_),
+        num_partitions_(other.num_partitions_),
+        partition_length_(other.partition_length_) {}
+
+  inline std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+  inline Range getPartition(const std::size_t partition_id) const {
+    DCHECK_LT(partition_id, num_partitions_);
+    const std::size_t begin = begin_ + partition_length_ * partition_id;
+    const std::size_t end =
+        partition_id == num_partitions_ - 1
+            ? end_
+            : begin + partition_length_;
+    return Range(begin, end);
+  }
+
+ private:
+  RangeSplitter(const std::size_t begin,
+                const std::size_t end,
+                const std::size_t num_partitions,
+                const std::size_t partition_length)
+      : begin_(begin),
+        end_(end),
+        num_partitions_(num_partitions),
+        partition_length_(partition_length) {
+    DCHECK_LE(num_partitions_ * partition_length_, end_);
+  }
+
+  static constexpr std::size_t kMaxNumPartitions =
+      std::numeric_limits<std::size_t>::max();
+
+  const std::size_t begin_;
+  const std::size_t end_;
+  const std::size_t num_partitions_;
+  const std::size_t partition_length_;
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_RANGE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/fff2fab3/utility/ScopedArray.hpp
----------------------------------------------------------------------
diff --git a/utility/ScopedArray.hpp b/utility/ScopedArray.hpp
new file mode 100644
index 0000000..c464ba9
--- /dev/null
+++ b/utility/ScopedArray.hpp
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_SCOPED_ARRAY_HPP_
+#define QUICKSTEP_UTILITY_SCOPED_ARRAY_HPP_
+
+#include <cstddef>
+#include <utility>
+
+#include "utility/ScopedBuffer.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+template <typename T>
+class ScopedArray {
+ public:
+  explicit ScopedArray(const std::size_t length, const bool initialize = false)
+      : buffer_(length * sizeof(T), initialize) {}
+
+  explicit ScopedArray(T *data = nullptr)
+      : buffer_(data) {}
+
+  explicit ScopedArray(ScopedArray &&orig)
+      : buffer_(std::move(orig.buffer_)) {}
+
+  inline void reset(const std::size_t length, const bool initialize = false) {
+    buffer_.reset(length * sizeof(T), initialize);
+  }
+
+  inline void reset(T *data = nullptr) {
+    buffer_.reset(data);
+  }
+
+  inline ScopedArray& operator=(ScopedArray &&rhs) {
+    buffer_ = std::move(rhs.buffer_);
+    return *this;
+  }
+
+  inline T* get() const {
+    return static_cast<T*>(buffer_.get());
+  }
+
+  inline T* release() {
+    return static_cast<T*>(buffer_.release());
+  }
+
+  inline T* operator->() const {
+    return get();
+  }
+
+  inline T& operator[](const std::size_t index) const {
+    return get()[index];
+  }
+
+ private:
+  ScopedBuffer buffer_;
+};
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_SCOPED_ARRAY_HPP_

Reply via email to