Repository: kudu Updated Branches: refs/heads/master c8dd7b53f -> 8af288a26
Implement BloomFilter Predicate in server side. Change-Id: I62c2de42667d0255d94e19db773240f7f9ee636c Reviewed-on: http://gerrit.cloudera.org:8080/11100 Tested-by: Kudu Jenkins Reviewed-by: Dan Burkert <danburk...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8af288a2 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8af288a2 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8af288a2 Branch: refs/heads/master Commit: 8af288a26a204e2acfc3aa4e642fba7de56b43bb Parents: c8dd7b5 Author: triplesheep <triplesheep0...@gmail.com> Authored: Wed Aug 1 10:54:21 2018 +0000 Committer: Dan Burkert <danburk...@apache.org> Committed: Thu Oct 11 22:50:37 2018 +0000 ---------------------------------------------------------------------- src/kudu/common/column_predicate-test.cc | 287 ++++++++++++++++++++++++++ src/kudu/common/column_predicate.cc | 193 +++++++++++++++-- src/kudu/common/column_predicate.h | 140 ++++++++++++- src/kudu/common/common.proto | 40 +++- src/kudu/common/key_util.cc | 2 + src/kudu/common/scan_spec.cc | 4 + src/kudu/common/wire_protocol-test.cc | 128 +++++++++++- src/kudu/common/wire_protocol.cc | 79 ++++++- src/kudu/tablet/cfile_set-test.cc | 211 ++++++++++++++++++- src/kudu/util/bloom_filter.h | 23 ++- 10 files changed, 1068 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc index d96cc41..e0b6b63 100644 --- a/src/kudu/common/column_predicate-test.cc +++ b/src/kudu/common/column_predicate-test.cc @@ -30,8 +30,10 @@ #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/util/bloom_filter.h" #include "kudu/util/int128.h" #include "kudu/util/memory/arena.h" +#include "kudu/util/random.h" #include "kudu/util/slice.h" #include "kudu/util/test_util.h" @@ -41,6 +43,7 @@ namespace kudu { class TestColumnPredicate : public KuduTest { public: + TestColumnPredicate() : rand_(SeedRandom()) {} // Test that when a is merged into b and vice versa, the result is equal to // expected, and the resulting type is equal to type. @@ -68,6 +71,42 @@ class TestColumnPredicate : public KuduTest { ASSERT_EQ(b_base.predicate_type(), type); } + void FillBloomFilterAndValues(int n_keys, + vector<uint64_t>* values, + BloomFilterBuilder* bfb1, + BloomFilterBuilder* bfb2) { + uint64_t current = 0; + for (int i = 0; i < 2; ++i) { + while (true) { + uint64_t key = rand_.Next(); + if (key <= current) { + continue; + } + current = key; + Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)); + BloomKeyProbe probe(key_slice, MURMUR_HASH_2); + bfb1->AddKey(probe); + bfb2->AddKey(probe); + values->emplace_back(key); + break; + } + } + for (int i = 2; i < n_keys; ++i) { + while (true) { + uint64_t key = rand_.Next(); + Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)); + BloomKeyProbe probe(key_slice, MURMUR_HASH_2); + BloomFilter bf(bfb1->slice(), bfb1->n_hashes()); + if (!bf.MayContainKey(probe) && key > current) { + current = key; + values->emplace_back(key); + bfb2->AddKey(probe); + break; + } + } + } + } + template <typename T> void TestMergeCombinations(const ColumnSchema& column, vector<T> values) { // Range + Range @@ -744,6 +783,184 @@ class TestColumnPredicate : public KuduTest { ColumnPredicate::IsNull(column), PredicateType::IsNull); } + + template <typename T> + void TestMergeBloomFilterCombinations(const ColumnSchema& column, + vector<ColumnPredicate::BloomFilterInner>* bf, + vector<T> values) { + vector<ColumnPredicate::BloomFilterInner> orig_bloom_filters = *bf; + // BloomFilter AND + // NONE + // = + // NONE + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::None(column), + ColumnPredicate::None(column), + PredicateType::None); + + // BloomFilter AND + // Equality + // = + // Equality + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::Equality(column, &values[0]), + ColumnPredicate::Equality(column, &values[0]), + PredicateType::Equality); + + // BloomFilter AND + // Equality + // = + // None + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::Equality(column, &values[2]), + ColumnPredicate::None(column), + PredicateType::None); + + // BloomFilter AND + // IS NOT NULL + // = + // BloomFilter + *bf = orig_bloom_filters; + vector<ColumnPredicate::BloomFilterInner> bf_copy = *bf; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::IsNotNull(column), + ColumnPredicate::InBloomFilter(column, &bf_copy, nullptr, nullptr), + PredicateType::InBloomFilter); + + // BloomFilter AND + // IS NULL + // = + // None + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::IsNull(column), + ColumnPredicate::None(column), + PredicateType::None); + + // BloomFilter AND + // InList + // = + // None(the value in list can not hit bloom filter) + *bf = orig_bloom_filters; + vector<const void*> in_list = { &values[2], &values[3], &values[4] }; + vector<const void*> hit_list; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InList(column, &in_list), + ColumnPredicate::None(column), + PredicateType::None); + + // BloomFilter AND + // InList + // = + // InList(the value in list all hits bloom filter) + in_list = { &values[0], &values[1] }; + hit_list = { &values[0], &values[1] }; + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InList(column, &in_list), + ColumnPredicate::InList(column, &hit_list), + PredicateType::InList); + + // BloomFilter AND + // InList + // = + // InList(only the some values in list hits bloom filter) + in_list = { &values[0], &values[1], &values[2], &values[3] }; + hit_list = { &values[0], &values[1]}; + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InList(column, &in_list), + ColumnPredicate::InList(column, &hit_list), + PredicateType::InList); + + // BloomFilter AND + // InList + // = + // Equality(only the first value in list hits bloom filter, so it simplify to Equality) + in_list = { &values[0], &values[2], &values[3] }; + *bf = orig_bloom_filters; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InList(column, &in_list), + ColumnPredicate::Equality(column, &values[0]), + PredicateType::Equality); + + // Range AND + // BloomFilter + // = + // BloomFilter with lower and upper bound + *bf = orig_bloom_filters; + bf_copy = *bf; + TestMerge(ColumnPredicate::Range(column, &values[0], &values[4]), + ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), + PredicateType::InBloomFilter); + + // BloomFilter with lower and upper bound AND + // Range + // = + // BloomFilter with lower and upper bound + *bf = orig_bloom_filters; + bf_copy = *bf; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[4]), + ColumnPredicate::Range(column, &values[1], &values[3]), + ColumnPredicate::InBloomFilter(column, &bf_copy, &values[1], &values[3]), + PredicateType::InBloomFilter); + + // BloomFilter with lower and upper bound AND + // Range + // = + // None + *bf = orig_bloom_filters; + bf_copy = *bf; + TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]), + ColumnPredicate::Range(column, &values[2], &values[4]), + ColumnPredicate::None(column), + PredicateType::None); + + // BloomFilter AND + // BloomFilter with lower and upper bound + // = + // BloomFilter with lower and upper bound + *bf = orig_bloom_filters; + bf_copy = *bf; + vector<ColumnPredicate::BloomFilterInner> collect = *bf; + collect.insert(collect.end(), bf->begin(), bf->end()); + TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), + ColumnPredicate::InBloomFilter(column, &collect, &values[0], &values[4]), + PredicateType::InBloomFilter); + + // BloomFilter with lower and upper bound AND + // BloomFilter with lower and upper bound + // = + // BloomFilter with lower and upper bound + *bf = orig_bloom_filters; + collect = *bf; + bf_copy = *bf; + collect.insert(collect.end(), bf->begin(), bf->end()); + TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[1], &values[3]), + ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), + ColumnPredicate::InBloomFilter(column, &collect, &values[1], &values[3]), + PredicateType::InBloomFilter); + + // BloomFilter with lower and upper bound AND + // BloomFilter with lower and upper bound + // = + // None + *bf = orig_bloom_filters; + collect = *bf; + bf_copy = *bf; + collect.insert(collect.end(), bf->begin(), bf->end()); + TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]), + ColumnPredicate::InBloomFilter(column, &bf_copy, &values[2], &values[4]), + ColumnPredicate::None(column), + PredicateType::None); + } + + protected: + Random rand_; }; TEST_F(TestColumnPredicate, TestMerge) { @@ -1161,4 +1378,74 @@ TEST_F(TestColumnPredicate, TestRedaction) { ASSERT_EQ("a = <redacted>", ColumnPredicate::Equality(column_i32, &one_32).ToString()); } +TEST_F(TestColumnPredicate, TestBloomFilterMerge) { + int n_keys = 5; // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2. + // Test for UINT64 type. + BloomFilterBuilder bfb1( + BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); + double expected_fp_rate1 = bfb1.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); + ASSERT_EQ(9, bfb1.n_bits() / n_keys); + BloomFilterBuilder bfb2( + BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); + double expected_fp_rate2 = bfb2.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); + ASSERT_EQ(9, bfb2.n_bits() / n_keys); + vector<uint64_t> values_int; + FillBloomFilterAndValues(n_keys, &values_int, &bfb1, &bfb2); + const Slice slice1 = bfb1.slice(); + const Slice slice2 = bfb2.slice(); + ColumnPredicate::BloomFilterInner bf1(slice1, bfb1.n_hashes(), MURMUR_HASH_2); + ColumnPredicate::BloomFilterInner bf2(slice2, bfb2.n_hashes(), MURMUR_HASH_2); + vector<ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bf1); + TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int); + bfs.clear(); + bfs.emplace_back(bf1); + bfs.emplace_back(bf2); + TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int); + + // Test for STRING type. + BloomFilterBuilder bfb3( + BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); + double expected_fp_rate3 = bfb3.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate3, 0.01, 0.002); + ASSERT_EQ(9, bfb3.n_bits() / n_keys); + // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2. + vector<std::string> keys = {"0", "00", "10", "100", "1100"}; + vector<Slice> keys_slice; + for (int i = 0; i < keys.size(); ++i) { + Slice key_slice(keys[i]); + BloomKeyProbe probe(key_slice, MURMUR_HASH_2); + if (i < 2) { + bfb3.AddKey(probe); + } + keys_slice.emplace_back(key_slice); + } + bfs.clear(); + bfs.emplace_back(bfb3.slice(), bfb3.n_hashes(), MURMUR_HASH_2); + TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, keys_slice); + + // Test for BINARY type + BloomFilterBuilder bfb4( + BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); + double expected_fp_rate4 = bfb4.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate4, 0.01, 0.002); + ASSERT_EQ(9, bfb4.n_bits() / n_keys); + vector<Slice> binary_keys = { Slice("", 0), + Slice("\0", 1), + Slice("\0\0", 2), + Slice("\0\0\0", 3), + Slice("\0\0\0\0", 4) }; + for (int i = 0; i < binary_keys.size(); ++i) { + BloomKeyProbe probe(binary_keys[i], MURMUR_HASH_2); + if (i < 2) { + bfb4.AddKey(probe); + } + } + bfs.clear(); + bfs.emplace_back(bfb4.slice(), bfb4.n_hashes(), MURMUR_HASH_2); + TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, binary_keys); +} + } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc index 1c784e6..b923ff4 100644 --- a/src/kudu/common/column_predicate.cc +++ b/src/kudu/common/column_predicate.cc @@ -19,6 +19,7 @@ #include <algorithm> #include <cstring> +#include <iterator> #include <boost/optional/optional.hpp> @@ -27,6 +28,7 @@ #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" +#include "kudu/gutil/macros.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/bitmap.h" @@ -59,6 +61,18 @@ ColumnPredicate::ColumnPredicate(PredicateType predicate_type, values_.swap(*values); } +ColumnPredicate::ColumnPredicate(PredicateType predicate_type, + ColumnSchema column, + std::vector<BloomFilterInner>* bfs, + const void* lower, + const void* upper) + : predicate_type_(predicate_type), + column_(move(column)), + lower_(lower), + upper_(upper) { + bloom_filters_.swap(*bfs); +} + ColumnPredicate ColumnPredicate::Equality(ColumnSchema column, const void* value) { CHECK(value != nullptr); return ColumnPredicate(PredicateType::Equality, move(column), value, nullptr); @@ -93,6 +107,17 @@ ColumnPredicate ColumnPredicate::InList(ColumnSchema column, return pred; } +ColumnPredicate ColumnPredicate::InBloomFilter(ColumnSchema column, + std::vector<BloomFilterInner>* bfs, + const void* lower, + const void* upper) { + CHECK(bfs != nullptr); + CHECK(!bfs->empty()); + ColumnPredicate pred(PredicateType::InBloomFilter, move(column), bfs, lower, upper); + pred.Simplify(); + return pred; +} + boost::optional<ColumnPredicate> ColumnPredicate::InclusiveRange(ColumnSchema column, const void* lower, const void* upper, @@ -167,7 +192,7 @@ void ColumnPredicate::SetToNone() { upper_ = nullptr; } -// TODO: For decimal columns, use column_.type_attributes().precision +// TODO(granthenke): For decimal columns, use column_.type_attributes().precision // to calculate the "true" max/min values for improved simplification. void ColumnPredicate::Simplify() { auto type_info = column_.type_info(); @@ -193,17 +218,13 @@ void ColumnPredicate::Simplify() { if (type_info->IsMinValue(lower_)) { predicate_type_ = PredicateType::IsNotNull; lower_ = nullptr; - upper_ = nullptr; } else if (type_info->IsMaxValue(lower_)) { predicate_type_ = PredicateType::Equality; - upper_ = nullptr; } } else if (upper_ != nullptr) { // VALUE < _ if (type_info->IsMinValue(upper_)) { - predicate_type_ = PredicateType::None; - lower_ = nullptr; - upper_ = nullptr; + SetToNone(); } } return; @@ -229,6 +250,42 @@ void ColumnPredicate::Simplify() { } return; }; + case PredicateType::InBloomFilter: { + if (lower_ == nullptr && upper_ == nullptr) { + return; + } + // Merge the optional lower and upper bound. + if (lower_ != nullptr && upper_ != nullptr) { + if (type_info->Compare(lower_, upper_) >= 0) { + // If the range bounds are empty then no results can be returned. + SetToNone(); + } else if (type_info->AreConsecutive(lower_, upper_)) { + if (CheckValueInBloomFilter(lower_)) { + predicate_type_ = PredicateType::Equality; + upper_ = nullptr; + bloom_filters_.clear(); + } else { + SetToNone(); + } + } + } else if (lower_ != nullptr) { + if (type_info->IsMinValue(lower_)) { + lower_ = nullptr; + } else if (type_info->IsMaxValue(lower_)) { + if (CheckValueInBloomFilter(lower_)) { + predicate_type_ = PredicateType::Equality; + bloom_filters_.clear(); + } else { + SetToNone(); + } + } + } else if (upper_ != nullptr) { + if (type_info->IsMinValue(upper_)) { + SetToNone(); + } + } + return; + }; } LOG(FATAL) << "unknown predicate type"; } @@ -257,6 +314,10 @@ void ColumnPredicate::Merge(const ColumnPredicate& other) { MergeIntoInList(other); return; }; + case PredicateType::InBloomFilter: { + MergeIntoBloomFilter(other); + return; + }; } LOG(FATAL) << "unknown predicate type"; } @@ -269,7 +330,11 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) { SetToNone(); return; }; - + case PredicateType::InBloomFilter: { + bloom_filters_ = other.bloom_filters_; + predicate_type_ = PredicateType::InBloomFilter; + FALLTHROUGH_INTENDED; + } case PredicateType::Range: { // Set the lower bound to the larger of the two. if (other.lower_ != nullptr && @@ -286,7 +351,6 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) { Simplify(); return; }; - case PredicateType::Equality: { if ((lower_ != nullptr && column_.type_info()->Compare(lower_, other.lower_) > 0) || (upper_ != nullptr && column_.type_info()->Compare(upper_, other.lower_) <= 0)) { @@ -303,7 +367,7 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) { case PredicateType::IsNull: { SetToNone(); return; - } + }; case PredicateType::InList : { // The InList predicate values are examined to check whether // they lie in the range. @@ -360,6 +424,12 @@ void ColumnPredicate::MergeIntoEquality(const ColumnPredicate& other) { } return; }; + case PredicateType::InBloomFilter: { + if (!other.CheckValueInBloomFilter(lower_)) { + SetToNone(); + } + return; + }; } LOG(FATAL) << "unknown predicate type"; } @@ -378,6 +448,7 @@ void ColumnPredicate::MergeIntoIsNotNull(const ColumnPredicate &other) { lower_ = other.lower_; upper_ = other.upper_; values_ = other.values_; + bloom_filters_ = other.bloom_filters_; return; } } @@ -499,6 +570,77 @@ void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) { Simplify(); return; }; + case PredicateType::InBloomFilter: { + std::vector<const void*> new_values; + std::copy_if(values_.begin(), values_.end(), std::back_inserter(new_values), + [&] (const void* value) { + return other.CheckValueInBloomFilter(value); + }); + values_.swap(new_values); + Simplify(); + return; + }; + } + LOG(FATAL) << "unknown predicate type"; +} + +void ColumnPredicate::MergeIntoBloomFilter(const ColumnPredicate &other) { + CHECK(predicate_type_ == PredicateType::InBloomFilter); + DCHECK(!bloom_filters_.empty()); + + switch (other.predicate_type()) { + case PredicateType::None: { + SetToNone(); + return; + }; + case PredicateType::InBloomFilter: { + bloom_filters_.insert(bloom_filters_.end(), other.bloom_filters().begin(), + other.bloom_filters().end()); + FALLTHROUGH_INTENDED; + } + case PredicateType::Range: { + // Merge the optional lower and upper bound. + if (other.lower_ != nullptr && + (lower_ == nullptr || column_.type_info()->Compare(lower_, other.lower_) < 0)) { + lower_ = other.lower_; + } + if (other.upper_ != nullptr && + (upper_ == nullptr || column_.type_info()->Compare(upper_, other.upper_) > 0)) { + upper_ = other.upper_; + } + Simplify(); + return; + } + case PredicateType::Equality: { + if (CheckValueInBloomFilter(other.lower_)) { + // Value falls in bloom filters so change to Equality predicate. + predicate_type_ = PredicateType::Equality; + lower_ = other.lower_; + upper_ = nullptr; + bloom_filters_.clear(); + } else { + SetToNone(); // Value does not fall in bloom filters. + } + return; + } + case PredicateType::IsNotNull: return; + case PredicateType::IsNull: { + SetToNone(); + return; + } + case PredicateType::InList: { + DCHECK(other.values_.size() > 1); + std::vector<const void*> new_values; + std::copy_if(other.values_.begin(), other.values_.end(), std::back_inserter(new_values), + [&] (const void* value) { + return CheckValueInBloomFilter(value); + }); + predicate_type_ = PredicateType::InList; + values_.swap(new_values); + bloom_filters_.clear(); + Simplify(); + return; + } } LOG(FATAL) << "unknown predicate type"; } @@ -588,6 +730,12 @@ void ColumnPredicate::EvaluateForPhysicalType(const ColumnBlock& block, return; }; case PredicateType::None: LOG(FATAL) << "NONE predicate evaluation"; + case PredicateType::InBloomFilter: { + ApplyPredicate(block, sel, [this] (const void* cell) { + return EvaluateCell<PhysicalType>(cell); + }); + return; + }; } LOG(FATAL) << "unknown predicate type"; } @@ -666,6 +814,9 @@ string ColumnPredicate::ToString() const { ss.append(")"); return ss; }; + case PredicateType::InBloomFilter: { + return strings::Substitute("`$0` IS InBloomFilter", column_.name()); + }; } LOG(FATAL) << "unknown predicate type"; } @@ -677,13 +828,18 @@ bool ColumnPredicate::operator==(const ColumnPredicate& other) const { } switch (predicate_type_) { case PredicateType::Equality: return column_.type_info()->Compare(lower_, other.lower_) == 0; + case PredicateType::InBloomFilter: { + if (bloom_filters_ != other.bloom_filters()) { + return false; + } + FALLTHROUGH_INTENDED; + }; case PredicateType::Range: { - return (lower_ == other.lower_ || - (lower_ != nullptr && other.lower_ != nullptr && - column_.type_info()->Compare(lower_, other.lower_) == 0)) && - (upper_ == other.upper_ || - (upper_ != nullptr && other.upper_ != nullptr && - column_.type_info()->Compare(upper_, other.upper_) == 0)); + auto bound_equal = [&] (const void* eleml, const void* elemr) { + return (eleml == elemr || (eleml != nullptr && elemr != nullptr && + column_.type_info()->Compare(eleml, elemr) == 0)); + }; + return bound_equal(lower_, other.lower_) && bound_equal(upper_, other.upper_); }; case PredicateType::InList: { if (values_.size() != other.values_.size()) return false; @@ -712,6 +868,10 @@ bool ColumnPredicate::CheckValueInList(const void* value) const { }); } +bool ColumnPredicate::CheckValueInBloomFilter(const void* value) const { + return EvaluateCell(column_.type_info()->physical_type(), value); +} + namespace { int SelectivityRank(const ColumnPredicate& predicate) { int rank; @@ -721,7 +881,8 @@ int SelectivityRank(const ColumnPredicate& predicate) { case PredicateType::Equality: rank = 2; break; case PredicateType::InList: rank = 3; break; case PredicateType::Range: rank = 4; break; - case PredicateType::IsNotNull: rank = 5; break; + case PredicateType::InBloomFilter: rank = 5; break; + case PredicateType::IsNotNull: rank = 6; break; default: LOG(FATAL) << "unknown predicate type"; } return rank * (kLargestTypeSize + 1) + predicate.column().type_info()->size(); http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h index 0155185..ea8385b 100644 --- a/src/kudu/common/column_predicate.h +++ b/src/kudu/common/column_predicate.h @@ -17,6 +17,9 @@ #pragma once +#include <cstddef> +#include <cstdint> + #include <algorithm> #include <ostream> #include <string> @@ -28,6 +31,8 @@ #include "kudu/common/common.pb.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" +#include "kudu/util/bloom_filter.h" +#include "kudu/util/slice.h" namespace kudu { @@ -56,6 +61,10 @@ enum class PredicateType { // A predicate which evaluates to true if the column value is present in // a value list. InList, + + // A predicate which evaluates to true if the column value is present in + // a bloom filter. + InBloomFilter, }; // A predicate which can be evaluated over a block of column values. @@ -73,6 +82,8 @@ enum class PredicateType { class ColumnPredicate { public: + class BloomFilterInner; + // Creates a new equality predicate on the column and value. // // The value is not copied, and must outlive the returned predicate. @@ -130,6 +141,12 @@ class ColumnPredicate { // The InList will be simplified into an Equality, Range or None if possible. static ColumnPredicate InList(ColumnSchema column, std::vector<const void*>* values); + // Create a new BloomFilter predicate for the column. + // + // The values are not copied, and must outlive the returned predicate. + static ColumnPredicate InBloomFilter(ColumnSchema column, std::vector<BloomFilterInner>* bfs, + const void* lower, const void* upper); + // Creates a new predicate which matches no values. static ColumnPredicate None(ColumnSchema column); @@ -174,12 +191,13 @@ class ColumnPredicate { case PredicateType::Range: { if (lower_ == nullptr) { return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0; - } else if (upper_ == nullptr) { + } + if (upper_ == nullptr) { return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0; - } else { - return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 && - DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0; } + return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 && + DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0; + }; case PredicateType::Equality: { return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) == 0; @@ -196,6 +214,9 @@ class ColumnPredicate { return DataTypeTraits<PhysicalType>::Compare(lhs, rhs) < 0; }); }; + case PredicateType::InBloomFilter: { + return EvaluateCellForBloomFilter<PhysicalType>(cell); + }; } LOG(FATAL) << "unknown predicate type"; } @@ -233,6 +254,64 @@ class ColumnPredicate { const std::vector<const void*>& raw_values() const { return values_; } + // Returns bloom filters if this is a bloom filter predicate. + const std::vector<BloomFilterInner>& bloom_filters() const { + return bloom_filters_; + } + + // This class represents the bloom filter used in predicate. + class BloomFilterInner { + public: + + BloomFilterInner(Slice bloom_data, size_t nhash, HashAlgorithm hash_algorithm) : + bloom_data_(bloom_data), + nhash_(nhash), + hash_algorithm_(hash_algorithm) { + } + + BloomFilterInner() : nhash_(0), hash_algorithm_(CITY_HASH) {} + + const Slice& bloom_data() const { + return bloom_data_; + } + + size_t nhash() const { + return nhash_; + } + + HashAlgorithm hash_algorithm() const { + return hash_algorithm_; + } + + void set_nhash(size_t nhash) { + nhash_ = nhash; + } + + void set_bloom_data(Slice bloom_data) { + bloom_data_ = bloom_data; + } + + void set_hash_algorithm(HashAlgorithm hash_algorithm) { + hash_algorithm_ = hash_algorithm; + } + + bool operator==(const BloomFilterInner& other) const { + return (bloom_data_ == other.bloom_data() && + nhash_ == other.nhash() && + hash_algorithm_ == other.hash_algorithm()); + } + + private: + + // The slice of bloom filter. + Slice bloom_data_; + + // The times of hash value used in bloom filter. + size_t nhash_; + + // The hash algorithm used in bloom filter. + HashAlgorithm hash_algorithm_; + }; private: @@ -249,6 +328,13 @@ class ColumnPredicate { ColumnSchema column, std::vector<const void*>* values); + // Creates a new BloomFilter column predicate. + ColumnPredicate(PredicateType predicate_type, + ColumnSchema column, + std::vector<BloomFilterInner>* bfs, + const void* lower, + const void* upper); + // Transition to a None predicate type. void SetToNone(); @@ -267,14 +353,49 @@ class ColumnPredicate { // Merge another predicate into this IS NULL predicate. void MergeIntoIsNull(const ColumnPredicate& other); + // Merge another predicate into this Bloom Fiter predicate. + void MergeIntoBloomFilter(const ColumnPredicate& other); + + // Merge another predicate into this InList predicate. + void MergeIntoInList(const ColumnPredicate& other); + // Templated evaluation to inline the dispatch of comparator. Templating this // allows dispatch to occur only once per batch. template <DataType PhysicalType> void EvaluateForPhysicalType(const ColumnBlock& block, SelectionVector* sel) const; - // Merge another predicate into this InList predicate. - void MergeIntoInList(const ColumnPredicate& other); + // Evaluate the bloom filter and avoid the predicate type check on a single cell. + template <DataType PhysicalType> + bool EvaluateCellForBloomFilter(const void* cell) const { + typedef typename DataTypeTraits<PhysicalType>::cpp_type cpp_type; + size_t size = sizeof(cpp_type); + const void* data = cell; + if (PhysicalType == BINARY) { + const Slice *slice = reinterpret_cast<const Slice *>(cell); + size = slice->size(); + data = slice->data(); + } + Slice cell_slice(reinterpret_cast<const uint8_t*>(data), size); + for (const auto& bf : bloom_filters_) { + BloomKeyProbe probe(cell_slice, bf.hash_algorithm()); + if (!BloomFilter(bf.bloom_data(), bf.nhash()).MayContainKey(probe)) { + return false; + } + } + // Check optional lower and upper bound. + if (lower_ != nullptr && upper_ != nullptr) { + return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 && + DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0; + } + if (upper_ != nullptr) { + return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0; + } + if (lower_ != nullptr) { + return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0; + } + return true; + } // For a Range type predicate, this helper function checks // whether a given value is in the range. @@ -284,6 +405,10 @@ class ColumnPredicate { // whether a given value is in the list. bool CheckValueInList(const void* value) const; + // For an BloomFilter type predicate, this helper function checks + // whether a given value is in the BloomFilter. + bool CheckValueInBloomFilter(const void* value) const; + // The type of this predicate. PredicateType predicate_type_; @@ -299,6 +424,9 @@ class ColumnPredicate { // The list of values to check column against if this is an InList predicate. std::vector<const void*> values_; + + // The list of bloom filter in this predicate. + std::vector<BloomFilterInner> bloom_filters_; }; // Compares predicates according to selectivity. Predicates that match fewer http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/common.proto ---------------------------------------------------------------------- diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index b3d323e..f0e9a33 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -285,6 +285,13 @@ enum ReplicaSelection { CLOSEST_REPLICA = 2; } +// The hash algorithm used in bloom filter and hash bucket. +enum HashAlgorithm { + UNKNOWN_HASH = 0; + MURMUR_HASH_2 = 1; + CITY_HASH = 2; +} + // The serialized format of a Kudu table partition schema. message PartitionSchemaPB { @@ -319,11 +326,6 @@ message PartitionSchemaPB { // input. optional uint32 seed = 3; - enum HashAlgorithm { - UNKNOWN = 0; - MURMUR_HASH_2 = 1; - } - // The hash algorithm to use for calculating the hash bucket. optional HashAlgorithm hash_algorithm = 4; } @@ -348,6 +350,15 @@ message ColumnPredicatePB { // The predicate column name. optional string column = 1; + // Represent a bloom filter. + message BloomFilter { + // The hash times for bloom filter. + optional int32 nhash = 1; + // The bloom filter bitmap. + optional bytes bloom_data = 2 [(kudu.REDACT) = true]; + optional HashAlgorithm hash_algorithm = 3 [default = CITY_HASH]; + } + message Range { // Bounds should be encoded as follows: @@ -357,8 +368,7 @@ message ColumnPredicatePB { // // Note that this predicate type should not be used for NULL data -- // NULL is defined to neither be greater than or less than other values - // for the comparison operator. We will eventually add a special - // predicate type for null-ness. + // for the comparison operator. // The inclusive lower bound. optional bytes lower = 1 [(kudu.REDACT) = true]; @@ -383,12 +393,28 @@ message ColumnPredicatePB { message IsNull {} + message InBloomFilter { + // A list of bloom filters for the field. + repeated BloomFilter bloom_filters = 1; + + // Lower and Upper is optional for InBloomFilter. + // When use both InBloomFilter and Range predicate for the same column the + // merge result can be InBloomFilter whith range bound inside. And the lower + // and upper works just like they in Range predicate. + // The inclusive lower bound. + optional bytes lower = 2 [(kudu.REDACT) = true]; + + // The exclusive upper bound. + optional bytes upper = 3 [(kudu.REDACT) = true]; + } + oneof predicate { Range range = 2; Equality equality = 3; IsNotNull is_not_null = 4; InList in_list = 5; IsNull is_null = 6; + InBloomFilter in_bloom_filter = 7; } } http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/key_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc index ed8dd86..ec98dec 100644 --- a/src/kudu/common/key_util.cc +++ b/src/kudu/common/key_util.cc @@ -231,6 +231,7 @@ int PushUpperBoundKeyPredicates(ColIdxIter first, memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size); pushed_predicates++; break; + case PredicateType::InBloomFilter: // Upper in InBloomFilter processed as upper in Range. case PredicateType::Range: if (predicate->raw_upper() != nullptr) { memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_upper(), size); @@ -297,6 +298,7 @@ int PushLowerBoundKeyPredicates(ColIdxIter first, size_t size = column.type_info()->size(); switch (predicate->predicate_type()) { + case PredicateType::InBloomFilter: // Lower in InBloomFilter processed as lower in Range. case PredicateType::Range: if (predicate->raw_lower() == nullptr) { break_loop = true; http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/scan_spec.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc index f5caab8..0660c10 100644 --- a/src/kudu/common/scan_spec.cc +++ b/src/kudu/common/scan_spec.cc @@ -193,6 +193,10 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema, // InList predicates should not be removed as the full constraints imposed by an InList // cannot be translated into only a single set of lower and upper bound primary keys break; + } else if (type == PredicateType::InBloomFilter) { + // InBloomFilter predicates should not be removed as the full constraints imposed by bloom + // filters cannot be translated into only a single set of lower and upper bound primary keys + break; } else { LOG(FATAL) << "Can not remove unknown predicate type"; } http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index 3d06b22..9c11de3 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/common/wire_protocol.h" + #include <cstddef> #include <cstdint> +#include <memory> #include <string> #include <vector> @@ -29,9 +32,10 @@ #include "kudu/common/row.h" #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" -#include "kudu/common/wire_protocol.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/gutil/port.h" #include "kudu/util/bitmap.h" +#include "kudu/util/bloom_filter.h" #include "kudu/util/faststring.h" #include "kudu/util/hexdump.h" #include "kudu/util/memory/arena.h" @@ -43,6 +47,7 @@ #include "kudu/util/test_util.h" using std::string; +using std::unique_ptr; using std::vector; namespace kudu { @@ -499,4 +504,125 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) { ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument()); } } + +class BFWireProtocolTest : public KuduTest { + public: + BFWireProtocolTest() + : schema_({ ColumnSchema("col1", INT32)}, 1), + arena_(1024), + n_keys_(100) { + bfb1_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01))); + bfb2_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01))); + } + + virtual void SetUp() OVERRIDE { + double expected_fp_rate1 = bfb1()->false_positive_rate(); + ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); + ASSERT_EQ(9, bfb1()->n_bits() / n_keys_); + double expected_fp_rate2 = bfb2()->false_positive_rate(); + ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); + ASSERT_EQ(9, bfb2()->n_bits() / n_keys_); + for (int i = 0; i < n_keys_; ++i) { + Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i)); + BloomKeyProbe probe(key_slice, MURMUR_HASH_2); + bfb1()->AddKey(probe); + bfb2()->AddKey(probe); + } + } + + BloomFilterBuilder* bfb1() const { return bfb1_.get(); } + + BloomFilterBuilder* bfb2() const { return bfb1_.get(); } + +protected: + Schema schema_; + Arena arena_; + int n_keys_; + unique_ptr<BloomFilterBuilder> bfb1_; + unique_ptr<BloomFilterBuilder> bfb2_; +}; + +TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) { + boost::optional<ColumnPredicate> predicate; + ColumnSchema col1 = schema_.column(0); + { // Single BloomFilter predicate. + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate, ibf); + } + + { // Multi BloomFilter predicate. + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate, ibf); + } +} + +TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) { + boost::optional<ColumnPredicate> predicate; + ColumnSchema col1 = schema_.column(0); + { // Simply BloomFilter with lower bound. + int lower = 1; + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, nullptr); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate, ibf); + } + + { // Single bloom filter with upper bound. + int upper = 4; + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, &upper); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate, ibf); + } + + { // Single bloom filter with both lower and upper bound. + int lower = 1; + int upper = 4; + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate, ibf); + } + + { // Multi bloom filter with both lower and upper bound. + int lower = 1; + int upper = 4; + vector<kudu::ColumnPredicate::BloomFilterInner> bfs; + bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); + bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2); + kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper); + ColumnPredicatePB pb; + NO_FATALS(ColumnPredicateToPB(ibf, &pb)); + ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); + ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter); + ASSERT_EQ(predicate->bloom_filters().size(), ibf.bloom_filters().size()); + ASSERT_EQ(predicate, ibf); + } +} + } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc index 0a5ce2a..2aab6f0 100644 --- a/src/kudu/common/wire_protocol.cc +++ b/src/kudu/common/wire_protocol.cc @@ -409,6 +409,16 @@ void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, stri bound_dst->assign(reinterpret_cast<const char*>(src), size); } +// Copies a predicate bloom filter data from 'bf_src' into 'bf_dst'. +void CopyPredicateBloomFilterToPB(const ColumnPredicate::BloomFilterInner& bf_src, + ColumnPredicatePB::BloomFilter* bf_dst) { + bf_dst->set_nhash(bf_src.nhash()); + const void* src = bf_src.bloom_data().data(); + size_t size = bf_src.bloom_data().size(); + bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(src), size); + bf_dst->set_hash_algorithm(bf_src.hash_algorithm()); +} + // Extract a void* pointer suitable for use in a ColumnRangePredicate from the // string protobuf bound. This validates that the pb_value has the correct // length, copies the data into 'arena', and sets *result to point to it. @@ -439,6 +449,21 @@ Status CopyPredicateBoundFromPB(const ColumnSchema& schema, return Status::OK(); } + +// Extract BloomFilterInner from bloom data for ColumnBloomFilterPredicate. +Status CopyPredicateBloomFilterFromPB(const ColumnPredicatePB::BloomFilter& bf_src, + ColumnPredicate::BloomFilterInner* dst_src, + Arena* arena) { + size_t bloom_data_size = bf_src.bloom_data().size(); + dst_src->set_nhash(bf_src.nhash()); + // Copy the data from the protobuf into the Arena. + uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(bloom_data_size)); + memcpy(data_copy, bf_src.bloom_data().data(), bloom_data_size); + dst_src->set_bloom_data(Slice(data_copy, bloom_data_size)); + dst_src->set_hash_algorithm(bf_src.hash_algorithm()); + return Status::OK(); +} + } // anonymous namespace void ColumnPredicateToPB(const ColumnPredicate& predicate, @@ -481,6 +506,25 @@ void ColumnPredicateToPB(const ColumnPredicate& predicate, return; }; case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf"; + case PredicateType::InBloomFilter: { + auto* bloom_filter_pred = pb->mutable_in_bloom_filter(); + for (const auto& bf : predicate.bloom_filters()) { + ColumnPredicatePB::BloomFilter* bloom_filter = bloom_filter_pred->add_bloom_filters(); + CopyPredicateBloomFilterToPB(bf, bloom_filter); + } + // Form the optional lower and upper bound. + if (predicate.raw_lower() != nullptr) { + CopyPredicateBoundToPB(predicate.column(), + predicate.raw_lower(), + bloom_filter_pred->mutable_lower()); + } + if (predicate.raw_upper() != nullptr) { + CopyPredicateBoundToPB(predicate.column(), + predicate.raw_upper(), + bloom_filter_pred->mutable_upper()); + } + return; + } } LOG(FATAL) << "unknown predicate type"; } @@ -546,9 +590,40 @@ Status ColumnPredicateFromPB(const Schema& schema, break; }; case ColumnPredicatePB::kIsNull: { - *predicate = ColumnPredicate::IsNull(col); - break; + *predicate = ColumnPredicate::IsNull(col); + break; + }; + case ColumnPredicatePB::kInBloomFilter: { + const auto& in_bloom_filter = pb.in_bloom_filter(); + vector<ColumnPredicate::BloomFilterInner> bloom_filters; + if (in_bloom_filter.bloom_filters_size() == 0) { + return Status::InvalidArgument("Invalid in bloom filter predicate on column: " + "no bloom filter contained", col.name()); + } + for (const auto& bf : in_bloom_filter.bloom_filters()) { + if (!bf.has_nhash() + || !bf.has_bloom_data() + || !bf.has_hash_algorithm() + || bf.hash_algorithm() == UNKNOWN_HASH) { + return Status::InvalidArgument("Invalid in bloom filter predicate on column: " + "missing bloom filter details", col.name()); + } + ColumnPredicate::BloomFilterInner bloom_filter; + RETURN_NOT_OK(CopyPredicateBloomFilterFromPB(bf, &bloom_filter, arena)); + bloom_filters.emplace_back(bloom_filter); + } + // Extract the optional lower and upper bound. + const void* lower = nullptr; + const void* upper = nullptr; + if (in_bloom_filter.has_lower()) { + RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.lower(), arena, &lower)); } + if (in_bloom_filter.has_upper()) { + RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.upper(), arena, &upper)); + } + *predicate = ColumnPredicate::InBloomFilter(col, &bloom_filters, lower, upper); + break; + }; default: return Status::InvalidArgument("Unknown predicate type for column", col.name()); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/tablet/cfile_set-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc index cef4b71..4ccdb17 100644 --- a/src/kudu/tablet/cfile_set-test.cc +++ b/src/kudu/tablet/cfile_set-test.cc @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/tablet/cfile_set.h" + +#include <algorithm> #include <cstddef> #include <cstdint> +#include <iterator> #include <memory> #include <ostream> #include <string> @@ -24,8 +28,8 @@ #include <gflags/gflags.h> #include <gflags/gflags_declare.h> -#include <gtest/gtest.h> #include <glog/logging.h> +#include <gtest/gtest.h> #include "kudu/common/column_materialization_context.h" #include "kudu/common/column_predicate.h" @@ -44,13 +48,13 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/stringpiece.h" -#include "kudu/tablet/cfile_set.h" #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/tablet-test-util.h" #include "kudu/util/auto_release_pool.h" #include "kudu/util/bloom_filter.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/memory/arena.h" +#include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" @@ -100,6 +104,75 @@ class TestCFileSet : public KuduRowSetTest { ASSERT_OK(rsw.Finish()); } + // Int32 type add probe to the bloom filter. + // bf1_contain: 0 2 4 6 8 ... (2n)th key for column 1 to form bloom filter. + // bf1_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 1 to form bloom filter. + // bf2_contain: 0 2 4 6 8 ... (2n)th key for column 2 to form bloom filter. + // bf2_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 2 to form bloom filter. + void FillBloomFilter(int nrows, + BloomFilterBuilder* bf1_contain, + BloomFilterBuilder* bf1_exclude, + BloomFilterBuilder* bf2_contain, + BloomFilterBuilder* bf2_exclude) { + int ratio[] = {2, 10, 100}; + bool add = true; + for (int i = 0; i < nrows; ++i) { + int curr1 = i * ratio[0]; + int curr2 = i * ratio[1]; + Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1)); + Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2)); + BloomKeyProbe probe1(first, MURMUR_HASH_2); + BloomKeyProbe probe2(second, MURMUR_HASH_2); + + if (add) { + bf1_contain->AddKey(probe1); + bf2_contain->AddKey(probe2); + } else { + bf1_exclude->AddKey(probe1); + bf2_exclude->AddKey(probe2); + } + add = !add; + } + } + + // Int32 type add probe to the bloom filter. + // ret1_contain: to get the key hits in bf1_contain for column 1. + // ret1_exclude: to get the key hits in bf1_exclude for column 1. + // ret2_contain: to get the key hits in bf2_contain for column 2. + // ret2_exclude: to get the key hits in bf2_exclude for column 2. + // In some case key may hit both contain and exclude bloom filter + // so we get accurate item hits the bloom filter for test behind. + void GetBloomFilterResult(int nrows, BloomFilterBuilder* bf1_contain, + BloomFilterBuilder* bf1_exclude, + BloomFilterBuilder* bf2_contain, + BloomFilterBuilder* bf2_exclude, + vector<size_t>* ret1_contain, + vector<size_t>* ret1_exclude, + vector<size_t>* ret2_contain, + vector<size_t>* ret2_exclude) { + int ratio[] = {2, 10, 100}; + for (int i = 0; i < nrows; ++i) { + int curr1 = i * ratio[0]; + int curr2 = i * ratio[1]; + Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1)); + Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2)); + BloomKeyProbe probe1(first, MURMUR_HASH_2); + BloomKeyProbe probe2(second, MURMUR_HASH_2); + if (BloomFilter(bf1_contain->slice(), bf1_contain->n_hashes()).MayContainKey(probe1)) { + ret1_contain->push_back(i); + } + if (BloomFilter(bf1_exclude->slice(), bf1_exclude->n_hashes()).MayContainKey(probe1)) { + ret1_exclude->push_back(i); + } + if (BloomFilter(bf2_contain->slice(), bf2_contain->n_hashes()).MayContainKey(probe2)) { + ret2_contain->push_back(i); + } + if (BloomFilter(bf2_exclude->slice(), bf2_exclude->n_hashes()).MayContainKey(probe2)) { + ret2_exclude->push_back(i); + } + } + } + // Issue a range scan between 'lower' and 'upper', and verify that all result // rows indeed fall inside that predicate. void DoTestRangeScan(const shared_ptr<CFileSet> &fileset, @@ -135,6 +208,47 @@ class TestCFileSet : public KuduRowSetTest { } } + // Issue a BloomFilter scan and verify that all result + // rows indeed fall inside that predicate. + void DoTestBloomFilterScan(const shared_ptr<CFileSet>& fileset, + vector<ColumnPredicate> predicates, + vector<size_t> target) { + LOG(INFO) << "predicates size: " << predicates.size(); + // Create iterator. + shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_)); + gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter)); + LOG(INFO) << "Target size: " << target.size(); + // Create a scan with a range predicate on the key column. + ScanSpec spec; + for (const auto& pred : predicates) { + spec.AddPredicate(pred); + } + ASSERT_OK(iter->Init(&spec)); + // Check that the range was respected on all the results. + Arena arena(1024); + RowBlock block(schema_, 100, &arena); + while (iter->HasNext()) { + ASSERT_OK_FAST(iter->NextBlock(&block)); + for (size_t i = 0; i < block.nrows(); i++) { + if (block.selection_vector()->IsRowSelected(i)) { + RowBlockRow row = block.row(i); + size_t index = row.row_index(); + vector<size_t>::iterator iter = std::find(target.begin(), target.end(), index); + if (iter == target.end()) { + FAIL() << "Row " << schema_.DebugRow(row) << " should not have " + << "passed predicate "; + } + target.erase(iter); + } + } + } + LOG(INFO) << "Selected size: " << block.selection_vector()->CountSelected(); + if (!target.empty()) { + FAIL() << "Target size " << target.size() << " should have " + << "passed predicate "; + } + } + Status MaterializeColumn(CFileSet::Iterator *iter, size_t col_idx, ColumnBlock *cb) { @@ -349,6 +463,99 @@ TEST_F(TestCFileSet, TestRangePredicates2) { DoTestRangeScan(fileset, kNumRows * 10, kNoBound); } +TEST_F(TestCFileSet, TestBloomFilterPredicates) { + const int kNumRows = 100; + BloomFilterBuilder bfb1_contain( + BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); + double expected_fp_rate1 = bfb1_contain.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); + ASSERT_EQ(9, bfb1_contain.n_bits() / kNumRows); + + BloomFilterBuilder bfb1_exclude( + BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); + double expected_fp_rate11 = bfb1_exclude.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate11, 0.01, 0.002); + ASSERT_EQ(9, bfb1_exclude.n_bits() / kNumRows); + + BloomFilterBuilder bfb2_contain( + BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); + double expected_fp_rate2 = bfb2_contain.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); + ASSERT_EQ(9, bfb2_contain.n_bits() / kNumRows); + + BloomFilterBuilder bfb2_exclude( + BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); + double expected_fp_rate22 = bfb2_exclude.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate22, 0.01, 0.002); + ASSERT_EQ(9, bfb2_exclude.n_bits() / kNumRows); + + WriteTestRowSet(kNumRows); + vector<size_t> ret1_contain; + vector<size_t> ret1_exclude; + vector<size_t> ret2_contain; + vector<size_t> ret2_exclude; + FillBloomFilter(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude); + GetBloomFilterResult(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude, + &ret1_contain, &ret1_exclude, &ret2_contain, &ret2_exclude); + + shared_ptr<CFileSet> fileset; + ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset)); + + vector<ColumnPredicate::BloomFilterInner> bfs; + // BloomFilter of column 0 contain. + ColumnPredicate::BloomFilterInner bf1_contain(bfb1_contain.slice(), + bfb1_contain.n_hashes(), MURMUR_HASH_2); + bfs.push_back(bf1_contain); + auto pred1_contain = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, nullptr, nullptr); + DoTestBloomFilterScan(fileset, { pred1_contain }, ret1_contain); + + // BloomFilter of column 1 contain. + ColumnPredicate::BloomFilterInner bf2_contain(bfb2_contain.slice(), + bfb2_contain.n_hashes(), MURMUR_HASH_2); + bfs.clear(); + bfs.push_back(bf2_contain); + auto pred2_contain = ColumnPredicate::InBloomFilter(schema_.column(1), &bfs, nullptr, nullptr); + DoTestBloomFilterScan(fileset, { pred2_contain }, ret2_contain); + + // BloomFilter of column 0 contain and exclude. + ColumnPredicate::BloomFilterInner bf1_exclude(bfb1_exclude.slice(), + bfb1_exclude.n_hashes(), MURMUR_HASH_2); + bfs.clear(); + bfs.push_back(bf1_contain); + bfs.push_back(bf1_exclude); + vector<size_t> ret1_contain_exclude; + auto pred1_contain_exclude = ColumnPredicate::InBloomFilter(schema_.column(0), + &bfs, nullptr, nullptr); + std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret1_exclude.begin(), + ret1_exclude.end(), std::back_inserter(ret1_contain_exclude)); + DoTestBloomFilterScan(fileset, { pred1_contain_exclude }, ret1_contain_exclude); + // BloomFilter of column 0 contain and column 1 contain. + vector<size_t> ret12_contain_contain; + std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret2_contain.begin(), + ret2_contain.end(), std::back_inserter(ret12_contain_contain)); + DoTestBloomFilterScan(fileset, { pred1_contain, pred2_contain }, ret12_contain_contain); + + // BloomFilter of column 0 contain with lower and upper bound. + int32_t lower = 8; + int32_t upper = 58; + int32_t lower_row_index = lower / 2; + int32_t upper_row_index = upper / 2; + vector<size_t> ret1_contain_range = ret1_contain; + vector<size_t>::iterator left = std::lower_bound(ret1_contain_range.begin(), + ret1_contain_range.end(), lower_row_index); + ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left + vector<size_t>::iterator right = std::lower_bound(ret1_contain_range.begin(), + ret1_contain_range.end(), upper_row_index); + ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right + auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper); + DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range); + + // BloomFilter of column 0 contain with Range with column. + bfs.clear(); + bfs.push_back(bf1_contain); + auto bf_with_range = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, &lower, &upper); + DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range); +} } // namespace tablet } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/util/bloom_filter.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/bloom_filter.h b/src/kudu/util/bloom_filter.h index ad4e3eb..0905fda 100644 --- a/src/kudu/util/bloom_filter.h +++ b/src/kudu/util/bloom_filter.h @@ -20,11 +20,13 @@ #include <cstddef> #include <cstdint> +#include "kudu/common/common.pb.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/hash/city.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/port.h" #include "kudu/util/bitmap.h" +#include "kudu/util/hash_util.h" #include "kudu/util/slice.h" namespace kudu { @@ -52,11 +54,22 @@ class BloomKeyProbe { // // NOTE: proper operation requires that the referenced memory remain // valid for the lifetime of this object. - explicit BloomKeyProbe(const Slice &key) : key_(key) { - uint64_t h = util_hash::CityHash64( - reinterpret_cast<const char *>(key.data()), - key.size()); - + explicit BloomKeyProbe(const Slice &key, HashAlgorithm hash_algorithm = CITY_HASH) + : key_(key) { + uint64_t h = 0; + switch (hash_algorithm) { + case MURMUR_HASH_2: + h = HashUtil::MurmurHash2_64( + reinterpret_cast<const char *>(key.data()), + key.size(), + /*seed=*/0); + break; + case CITY_HASH: + default: + h = util_hash::CityHash64( + reinterpret_cast<const char *>(key.data()), + key.size()); + } // Use the top and bottom halves of the 64-bit hash // as the two independent hash functions for mixing. h_1_ = static_cast<uint32_t>(h);