http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/if/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/if/BUCK b/hbase-native-client/src/hbase/if/BUCK deleted file mode 100644 index c8d51f2..0000000 --- a/hbase-native-client/src/hbase/if/BUCK +++ /dev/null @@ -1,49 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -PROTO_SRCS = glob(['*.proto']) -HEADER_FILENAMES = [x.replace('.proto', '.pb.h') for x in PROTO_SRCS] -CC_FILENAMES = [x.replace('.proto', '.pb.cc') for x in PROTO_SRCS] - -genrule( - name='generate-proto-sources', - srcs=PROTO_SRCS, - cmd='mkdir -p $OUT && pwd && protoc --proto_path=. --cpp_out=$OUT *.proto', - out='output', ) - -for header_filename in HEADER_FILENAMES: - genrule(name=header_filename, - cmd='mkdir -p `dirname $OUT` ' - ' && cp $(location :generate-proto-sources)/{} $OUT'.format( - header_filename), - out=header_filename, ) -for cc_filename in CC_FILENAMES: - genrule( - name=cc_filename, - cmd='mkdir -p `dirname $OUT` ' - ' && cp $(location :generate-proto-sources)/*.cc `dirname $OUT` ' - ' && cp $(location :generate-proto-sources)/*.h `dirname $OUT`'.format( - cc_filename), - out=cc_filename, ) - -cxx_library(name='if', - header_namespace="hbase/if", - exported_headers=[':' + x for x in HEADER_FILENAMES], - srcs=[':' + x for x in CC_FILENAMES], - deps=['//third-party:protobuf'], - visibility=['PUBLIC', ], - exported_deps=['//third-party:protobuf'])
http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/security/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/security/BUCK b/hbase-native-client/src/hbase/security/BUCK deleted file mode 100644 index c329f30..0000000 --- a/hbase-native-client/src/hbase/security/BUCK +++ /dev/null @@ -1,27 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This is the library dealing with a single connection -# to a single server. -cxx_library( - name="security", - srcs=[], - deps=["//include/hbase/security:security", "//src/hbase/client:conf"], - compiler_flags=['-Weffc++'], - visibility=[ - 'PUBLIC', - ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/BUCK b/hbase-native-client/src/hbase/serde/BUCK deleted file mode 100644 index 6b39e0b..0000000 --- a/hbase-native-client/src/hbase/serde/BUCK +++ /dev/null @@ -1,86 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cxx_library( - name="serde", - srcs=[ - "rpc-serde.cc", - "zk.cc", - ], - deps=[ - "//include/hbase/serde:serde", "//src/hbase/if:if", "//third-party:folly", "//src/hbase/utils:utils", "//src/hbase/security:security" - ], - tests=[ - ":client-deserializer-test", - ":client-serializer-test", - ":server-name-test", - ":table-name-test", - ":zk-deserializer-test", - ":region-info-deserializer-test", - ], - compiler_flags=['-Weffc++'], - visibility=[ - 'PUBLIC', - ],) -cxx_test( - name="table-name-test", - srcs=[ - "table-name-test.cc", - ], - deps=[ - ":serde", - ],) -cxx_test( - name="server-name-test", - srcs=[ - "server-name-test.cc", - ], - deps=[ - ":serde", - ],) -cxx_test( - name="client-serializer-test", - srcs=[ - "client-serializer-test.cc", - ], - deps=[ - ":serde", - ],) -cxx_test( - name="client-deserializer-test", - srcs=[ - "client-deserializer-test.cc", - ], - deps=[ - ":serde", - ],) -cxx_test( - name="zk-deserializer-test", - srcs=[ - "zk-deserializer-test.cc", - ], - deps=[ - ":serde", - ],) -cxx_test( - name="region-info-deserializer-test", - srcs=[ - "region-info-deserializer-test.cc", - ], - deps=[ - ":serde", - ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/client-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/client-deserializer-test.cc b/hbase-native-client/src/hbase/serde/client-deserializer-test.cc deleted file mode 100644 index 3e4c42b..0000000 --- a/hbase-native-client/src/hbase/serde/client-deserializer-test.cc +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <folly/io/IOBuf.h> -#include <gtest/gtest.h> - -#include "hbase/if/Client.pb.h" -#include "hbase/serde/rpc-serde.h" - -using namespace hbase; -using folly::IOBuf; -using hbase::pb::GetRequest; -using hbase::pb::RegionSpecifier; -using hbase::pb::RegionSpecifier_RegionSpecifierType; - -TEST(TestRpcSerde, TestReturnFalseOnNullPtr) { - RpcSerde deser{nullptr}; - ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0); -} - -TEST(TestRpcSerde, TestReturnFalseOnBadInput) { - RpcSerde deser{nullptr}; - auto buf = IOBuf::copyBuffer("test"); - GetRequest gr; - - ASSERT_LT(deser.ParseDelimited(buf.get(), &gr), 0); -} - -TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) { - GetRequest in; - RpcSerde ser{nullptr}; - RpcSerde deser{nullptr}; - - // fill up the GetRequest. - in.mutable_region()->set_value("test_region_id"); - in.mutable_region()->set_type( - RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); - in.mutable_get()->set_row("test_row"); - - // Create the buffer - auto buf = ser.SerializeDelimited(in); - - GetRequest out; - - int used_bytes = deser.ParseDelimited(buf.get(), &out); - - ASSERT_GT(used_bytes, 0); - ASSERT_EQ(used_bytes, buf->length()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/client-serializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/client-serializer-test.cc b/hbase-native-client/src/hbase/serde/client-serializer-test.cc deleted file mode 100644 index 8279caa..0000000 --- a/hbase-native-client/src/hbase/serde/client-serializer-test.cc +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <gtest/gtest.h> - -#include <folly/io/Cursor.h> - -#include <string> - -#include "hbase/if/HBase.pb.h" -#include "hbase/if/RPC.pb.h" -#include "hbase/serde/rpc-serde.h" - -using namespace hbase; -using namespace hbase::pb; -using namespace folly; -using namespace folly::io; - -TEST(RpcSerdeTest, PreambleIncludesHBas) { - RpcSerde ser{nullptr}; - auto buf = ser.Preamble(false); - const char *p = reinterpret_cast<const char *>(buf->data()); - // Take the first for chars and make sure they are the - // magic string - EXPECT_EQ("HBas", std::string(p, 4)); - - EXPECT_EQ(6, buf->computeChainDataLength()); -} - -TEST(RpcSerdeTest, PreambleIncludesVersion) { - RpcSerde ser{nullptr}; - auto buf = ser.Preamble(false); - EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]); - EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]); -} - -TEST(RpcSerdeTest, TestHeaderLengthPrefixed) { - RpcSerde ser{nullptr}; - auto header = ser.Header("elliott"); - - // The header should be prefixed by 4 bytes of length. - EXPECT_EQ(4, header->length()); - EXPECT_TRUE(header->length() < header->computeChainDataLength()); - EXPECT_TRUE(header->isChained()); - - // Now make sure the length is correct. - Cursor cursor(header.get()); - auto prefixed_len = cursor.readBE<uint32_t>(); - EXPECT_EQ(prefixed_len, header->next()->length()); -} - -TEST(RpcSerdeTest, TestHeaderDecode) { - RpcSerde ser{nullptr}; - auto buf = ser.Header("elliott"); - auto header_buf = buf->next(); - ConnectionHeader h; - - EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length())); - EXPECT_EQ("elliott", h.user_info().effective_user()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/region-info-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/region-info-deserializer-test.cc b/hbase-native-client/src/hbase/serde/region-info-deserializer-test.cc deleted file mode 100644 index 84219d8..0000000 --- a/hbase-native-client/src/hbase/serde/region-info-deserializer-test.cc +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "hbase/serde/region-info.h" - -#include <gtest/gtest.h> - -#include <string> - -#include "hbase/if/HBase.pb.h" -#include "hbase/serde/table-name.h" - -using std::string; -using hbase::pb::RegionInfo; -using hbase::pb::TableName; - -TEST(TestRegionInfoDesializer, TestDeserialize) { - string ns{"test_ns"}; - string tn{"table_name"}; - string start_row{"AAAAAA"}; - string stop_row{"BBBBBBBBBBBB"}; - uint64_t region_id = 2345678; - - RegionInfo ri_out; - ri_out.set_region_id(region_id); - ri_out.mutable_table_name()->set_namespace_(ns); - ri_out.mutable_table_name()->set_qualifier(tn); - ri_out.set_start_key(start_row); - ri_out.set_end_key(stop_row); - - string header{"PBUF"}; - string ser = header + ri_out.SerializeAsString(); - - auto out = folly::to<RegionInfo>(ser); - - EXPECT_EQ(region_id, out.region_id()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/server-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/server-name-test.cc b/hbase-native-client/src/hbase/serde/server-name-test.cc deleted file mode 100644 index 12d3287..0000000 --- a/hbase-native-client/src/hbase/serde/server-name-test.cc +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "hbase/serde/server-name.h" - -#include <gtest/gtest.h> -#include <string> - -using hbase::pb::ServerName; - -TEST(TestServerName, TestMakeServerName) { - auto sn = folly::to<ServerName>("test:123"); - - ASSERT_EQ("test", sn.host_name()); - ASSERT_EQ(123, sn.port()); -} - -TEST(TestServerName, TestIps) { - auto sn = folly::to<ServerName>("127.0.0.1:999"); - ASSERT_EQ("127.0.0.1", sn.host_name()); - ASSERT_EQ(999, sn.port()); -} - -TEST(TestServerName, TestThrow) { ASSERT_ANY_THROW(folly::to<ServerName>("Ther's no colon here")); } - -TEST(TestServerName, TestIPV6) { - auto sn = folly::to<ServerName>("[::::1]:123"); - - ASSERT_EQ("[::::1]", sn.host_name()); - ASSERT_EQ(123, sn.port()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/table-name-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/table-name-test.cc b/hbase-native-client/src/hbase/serde/table-name-test.cc deleted file mode 100644 index 77bd6c2..0000000 --- a/hbase-native-client/src/hbase/serde/table-name-test.cc +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Conv.h> -#include <gtest/gtest.h> - -#include <string> - -#include "hbase/serde/table-name.h" - -using namespace hbase; -using hbase::pb::TableName; - -TEST(TestTableName, TestToStringNoDefault) { - TableName tn; - tn.set_qualifier("TestTableName"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("default"), std::string::npos); - ASSERT_EQ("TestTableName", result); -} - -TEST(TestTableName, TestToStringNoDefaltWhenSet) { - TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier("TestTableName"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("default"), std::string::npos); - ASSERT_EQ("TestTableName", result); -} - -TEST(TestTableName, TestToStringIncludeNS) { - TableName tn; - tn.set_namespace_("hbase"); - tn.set_qualifier("acl"); - std::string result = folly::to<std::string>(tn); - ASSERT_EQ(result.find("hbase"), 0); - ASSERT_EQ("hbase:acl", result); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/serde/zk-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/serde/zk-deserializer-test.cc b/hbase-native-client/src/hbase/serde/zk-deserializer-test.cc deleted file mode 100644 index 141efce..0000000 --- a/hbase-native-client/src/hbase/serde/zk-deserializer-test.cc +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "hbase/serde/zk.h" - -#include <folly/Logging.h> -#include <folly/io/Cursor.h> -#include <folly/io/IOBuf.h> -#include <gtest/gtest.h> - -#include "hbase/if/ZooKeeper.pb.h" - -using namespace hbase; -using namespace hbase::pb; -using namespace folly; -using namespace std; -using namespace folly::io; - -// Test that would test if there's nothing there. -TEST(TestZkDesializer, TestThrowNoMagicNum) { - ZkDeserializer deser; - MetaRegionServer mrs; - - auto buf = IOBuf::create(100); - buf->append(100); - RWPrivateCursor c{buf.get()}; - c.write<uint8_t>(99); - ASSERT_THROW(deser.Parse(buf.get(), &mrs), runtime_error); -} - -// Test if the protobuf is in a format that we can't decode -TEST(TestZkDesializer, TestBadProtoThrow) { - ZkDeserializer deser; - MetaRegionServer mrs; - string magic{"PBUF"}; - - // Set ServerName - mrs.mutable_server()->set_host_name("test"); - mrs.mutable_server()->set_port(567); - mrs.mutable_server()->set_start_code(9567); - - // One byte magic number - // four bytes for id length - // four bytes for id - // four bytes for PBUF - uint32_t start_len = 1 + 4 + 4 + 4; - // How large the protobuf will be - uint32_t pbuf_size = mrs.ByteSize(); - - auto buf = IOBuf::create(start_len + pbuf_size); - buf->append(start_len + pbuf_size); - RWPrivateCursor c{buf.get()}; - - // Write the magic number - c.write<uint8_t>(255); - // Write the id len - c.writeBE<uint32_t>(4); - // Write the id - c.write<uint32_t>(13); - // Write the PBUF string - c.push(reinterpret_cast<const uint8_t *>(magic.c_str()), 4); - - // Create the protobuf - MetaRegionServer out; - ASSERT_THROW(deser.Parse(buf.get(), &out), runtime_error); -} - -// Test to make sure the whole thing works. -TEST(TestZkDesializer, TestNoThrow) { - ZkDeserializer deser; - MetaRegionServer mrs; - string magic{"PBUF"}; - - // Set ServerName - mrs.mutable_server()->set_host_name("test"); - mrs.mutable_server()->set_port(567); - mrs.mutable_server()->set_start_code(9567); - - // One byte magic number - // four bytes for id length - // four bytes for id - // four bytes for PBUF - uint32_t start_len = 1 + 4 + 4 + 4; - // How large the protobuf will be - uint32_t pbuf_size = mrs.ByteSize(); - - auto buf = IOBuf::create(start_len + pbuf_size); - buf->append(start_len + pbuf_size); - RWPrivateCursor c{buf.get()}; - - // Write the magic number - c.write<uint8_t>(255); - // Write the id len - c.writeBE<uint32_t>(4); - // Write the id - c.write<uint32_t>(13); - // Write the PBUF string - c.push(reinterpret_cast<const uint8_t *>(magic.c_str()), 4); - - // Now write the serialized protobuf - mrs.SerializeWithCachedSizesToArray(buf->writableData() + start_len); - - // Create the protobuf - MetaRegionServer out; - ASSERT_TRUE(deser.Parse(buf.get(), &out)); - ASSERT_EQ(mrs.server().host_name(), out.server().host_name()); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/test-util/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/test-util/BUCK b/hbase-native-client/src/hbase/test-util/BUCK deleted file mode 100644 index f1aedab..0000000 --- a/hbase-native-client/src/hbase/test-util/BUCK +++ /dev/null @@ -1,53 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os - -cxx_library( - name="test-util", - header_namespace="hbase/test-util", - exported_headers=["test-util.h", "mini-cluster.h"], - srcs=["test-util.cc", "mini-cluster.cc"], - deps=[ - "//third-party:folly", - "//src/hbase/client:client", - ], - preprocessor_flags=[ - '-I' + os.environ['JAVA_HOME'] + '/include', - '-I' + os.environ['JAVA_HOME'] + '/include/darwin', - '-I' + os.environ['JAVA_HOME'] + '/include/linux' - ], - exported_preprocessor_flags=[ - '-I' + os.environ['JAVA_HOME'] + '/include', - '-I' + os.environ['JAVA_HOME'] + '/include/darwin', - '-I' + os.environ['JAVA_HOME'] + '/include/linux' - ], - compiler_flags=[ - '-I' + os.environ['JAVA_HOME'] + '/include', - '-I' + os.environ['JAVA_HOME'] + '/include/darwin', - '-I' + os.environ['JAVA_HOME'] + '/include/linux', '-ggdb' - ], - linker_flags=[ - '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server', - '-ggdb' - ], - exported_linker_flags=[ - '-ljvm', '-L' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server', - '-Wl,-rpath=' + os.environ['JAVA_HOME'] + '/jre/lib/amd64/server' - ], - visibility=[ - 'PUBLIC', - ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/test-util/mini-cluster.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/test-util/mini-cluster.cc b/hbase-native-client/src/hbase/test-util/mini-cluster.cc index 1e491a2..98a4e7e 100644 --- a/hbase-native-client/src/hbase/test-util/mini-cluster.cc +++ b/hbase-native-client/src/hbase/test-util/mini-cluster.cc @@ -80,7 +80,7 @@ MiniCluster::~MiniCluster() { void MiniCluster::Setup() { jmethodID constructor; - pthread_mutex_lock(&count_mutex_); + std::lock_guard<std::mutex> lock(count_mutex_); if (env_ == NULL) { env_ = CreateVM(&jvm_); if (env_ == NULL) { @@ -176,7 +176,6 @@ void MiniCluster::Setup() { exit(-1); } } - pthread_mutex_unlock(&count_mutex_); } jobject MiniCluster::htu() { http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/test-util/mini-cluster.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/test-util/mini-cluster.h b/hbase-native-client/src/hbase/test-util/mini-cluster.h deleted file mode 100644 index 6b4547c..0000000 --- a/hbase-native-client/src/hbase/test-util/mini-cluster.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <jni.h> -#include <string> -#include <vector> - -namespace hbase { - -class MiniCluster { - public: - virtual ~MiniCluster(); - jobject StartCluster(int32_t num_region_servers); - void StopCluster(); - jobject CreateTable(const std::string &table, const std::string &family); - jobject CreateTable(const std::string &table, const std::vector<std::string> &families); - jobject CreateTable(const std::string &table, const std::string &family, - const std::vector<std::string> &keys); - jobject CreateTable(const std::string &table, const std::vector<std::string> &families, - const std::vector<std::string> &keys); - jobject StopRegionServer(int idx); - - // moves region to server - void MoveRegion(const std::string ®ion, const std::string &server); - // returns the Configuration instance for the cluster - jobject GetConf(); - // returns the value for config key retrieved from cluster - const std::string GetConfValue(const std::string &key); - - private: - JNIEnv *env_; - jclass testing_util_class_; - jclass table_name_class_; - jclass put_class_; - jclass conf_class_; - jmethodID stop_rs_mid_; - jmethodID get_conf_mid_; - jmethodID set_conf_mid_; - jmethodID tbl_name_value_of_mid_; - jmethodID create_table_mid_; - jmethodID create_table_families_mid_; - jmethodID create_table_with_split_mid_; - jmethodID put_mid_; - jmethodID put_ctor_; - jmethodID add_col_mid_; - jmethodID create_conn_mid_; - jmethodID get_conn_mid_; - jmethodID get_table_mid_; - jmethodID conf_get_mid_; - jmethodID get_admin_mid_; - jmethodID move_mid_; - jmethodID str_ctor_mid_; - jobject htu_; - jobject cluster_; - pthread_mutex_t count_mutex_; - JavaVM *jvm_; - JNIEnv *CreateVM(JavaVM **jvm); - void Setup(); - jobject htu(); - JNIEnv *env(); - jbyteArray StrToByteChar(const std::string &str); - jobject admin(); -}; -} /*namespace hbase*/ http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/test-util/test-util.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/test-util/test-util.h b/hbase-native-client/src/hbase/test-util/test-util.h deleted file mode 100644 index 7c57c28..0000000 --- a/hbase-native-client/src/hbase/test-util/test-util.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <folly/Random.h> -#include <folly/experimental/TestUtil.h> - -#include <cstdlib> -#include <memory> -#include <string> -#include <vector> -#include "hbase/client/configuration.h" -#include "hbase/test-util/mini-cluster.h" - -namespace hbase { -/** - * @brief Class to deal with a local instance cluster for testing. - */ -class TestUtil { - public: - TestUtil(); - - /** - * Destroying a TestUtil will spin down a cluster and remove the test dir. - */ - ~TestUtil(); - - /** - * Create a random string. This random string is all letters, as such it is - * very good for use as a directory name. - */ - static std::string RandString(int len = 32); - - /** - * Returns the configuration to talk to the local cluster - */ - std::shared_ptr<Configuration> conf() const { return conf_; } - - /** - * Starts mini hbase cluster with specified number of region servers - */ - void StartMiniCluster(int32_t num_region_servers); - - void StopMiniCluster(); - void CreateTable(const std::string &table, const std::string &family); - void CreateTable(const std::string &table, const std::vector<std::string> &families); - void CreateTable(const std::string &table, const std::string &family, - const std::vector<std::string> &keys); - void CreateTable(const std::string &table, const std::vector<std::string> &families, - const std::vector<std::string> &keys); - - void StartStandAloneInstance(); - void StopStandAloneInstance(); - void RunShellCmd(const std::string &); - void MoveRegion(const std::string ®ion, const std::string &server); - - private: - std::unique_ptr<MiniCluster> mini_; - folly::test::TemporaryDirectory temp_dir_; - std::shared_ptr<Configuration> conf_ = std::make_shared<Configuration>(); -}; -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/utils/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/utils/BUCK b/hbase-native-client/src/hbase/utils/BUCK deleted file mode 100644 index ab55d8f..0000000 --- a/hbase-native-client/src/hbase/utils/BUCK +++ /dev/null @@ -1,57 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cxx_library( - name="utils", - exported_headers=[ - ], - srcs=["bytes-util.cc", "connection-util.cc", "user-util.cc"], - deps=[ - '//include/hbase/utils:utils', - '//third-party:folly', - ], - tests=[":user-util-test"], - linker_flags=['-L/usr/local/lib', '-lkrb5'], - exported_linker_flags=['-L/usr/local/lib', '-lkrb5'], - visibility=[ - 'PUBLIC', - ], - compiler_flags=['-Weffc++'],) -cxx_test( - name="bytes-util-test", - srcs=[ - "bytes-util-test.cc", - ], - deps=[ - ":utils", - ],) -cxx_test( - name="concurrent-map-test", - srcs=[ - "concurrent-map-test.cc", - ], - deps=[ - ":utils", - ],) -cxx_test( - name="user-util-test", - srcs=[ - "user-util-test.cc", - ], - deps=[ - ":utils", - ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/utils/bytes-util-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/utils/bytes-util-test.cc b/hbase-native-client/src/hbase/utils/bytes-util-test.cc deleted file mode 100644 index e601d2d..0000000 --- a/hbase-native-client/src/hbase/utils/bytes-util-test.cc +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <gtest/gtest.h> -#include <string> - -#include "hbase/utils/bytes-util.h" - -using hbase::BytesUtil; - -TEST(TestBytesUtil, TestToStringBinary) { - std::string empty{""}; - EXPECT_EQ(empty, BytesUtil::ToStringBinary(empty)); - - std::string foo_bar{"foo bar"}; - EXPECT_EQ(foo_bar, BytesUtil::ToStringBinary(foo_bar)); - - std::string foo_bar2{"foo bar_/!@#$%^&*(){}[]|1234567890"}; - EXPECT_EQ(foo_bar2, BytesUtil::ToStringBinary(foo_bar2)); - - char zero = 0; - EXPECT_EQ("\\x00", BytesUtil::ToStringBinary(std::string{zero})); - - char max = 255; - EXPECT_EQ("\\xFF", BytesUtil::ToStringBinary(std::string{max})); - - EXPECT_EQ("\\x00\\xFF", BytesUtil::ToStringBinary(std::string{zero} + std::string{max})); - - EXPECT_EQ("foo_\\x00\\xFF_bar", - BytesUtil::ToStringBinary("foo_" + std::string{zero} + std::string{max} + "_bar")); -} - -TEST(TestBytesUtil, TestToStringToInt64) { - int64_t num = 761235; - EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num))); - - num = -56125; - EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num))); - - num = 0; - EXPECT_EQ(num, BytesUtil::ToInt64(BytesUtil::ToString(num))); -} - -TEST(TestBytesUtil, TestCreateClosestRowAfter) { - std::string empty{""}; - EXPECT_EQ(BytesUtil::CreateClosestRowAfter(empty), std::string{'\0'}); - - std::string foo{"foo"}; - EXPECT_EQ(BytesUtil::CreateClosestRowAfter(foo), std::string{"foo"} + '\0'); - - EXPECT_EQ("f\\x00", BytesUtil::ToStringBinary(BytesUtil::CreateClosestRowAfter("f"))); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/utils/bytes-util.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/utils/bytes-util.cc b/hbase-native-client/src/hbase/utils/bytes-util.cc index 144b866..5b34aa0 100644 --- a/hbase-native-client/src/hbase/utils/bytes-util.cc +++ b/hbase-native-client/src/hbase/utils/bytes-util.cc @@ -19,7 +19,6 @@ #include "hbase/utils/bytes-util.h" -#include <bits/stdc++.h> #include <boost/predef.h> #include <glog/logging.h> http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/utils/concurrent-map-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/utils/concurrent-map-test.cc b/hbase-native-client/src/hbase/utils/concurrent-map-test.cc deleted file mode 100644 index b95c442..0000000 --- a/hbase-native-client/src/hbase/utils/concurrent-map-test.cc +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <gtest/gtest.h> -#include <string> - -#include "hbase/utils/concurrent-map.h" - -using hbase::concurrent_map; - -TEST(TestConcurrentMap, TestFindAndErase) { - concurrent_map<std::string, std::string> map{500}; - - map.insert(std::make_pair("foo", "bar")); - auto prev = map.find_and_erase("foo"); - ASSERT_EQ("bar", prev); - - ASSERT_EQ(map.end(), map.find("foo")); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/hbase/utils/user-util-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/hbase/utils/user-util-test.cc b/hbase-native-client/src/hbase/utils/user-util-test.cc deleted file mode 100644 index 6a022ef..0000000 --- a/hbase-native-client/src/hbase/utils/user-util-test.cc +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <folly/Logging.h> -#include <gtest/gtest.h> -#include <string> - -#include "hbase/utils/user-util.h" - -using namespace std; -using namespace hbase; - -TEST(TestUserUtil, TestGetSomething) { - UserUtil u_util; - string name = u_util.user_name(false); - - // TODO shell out to whoami to check this. - ASSERT_GT(name.length(), 0); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/test/append-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/test/append-test.cc b/hbase-native-client/src/test/append-test.cc new file mode 100644 index 0000000..1af138d --- /dev/null +++ b/hbase-native-client/src/test/append-test.cc @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "hbase/client/append.h" +#include "hbase/client/mutation.h" +#include "hbase/utils/time-util.h" + +using hbase::Append; +using hbase::Cell; +using hbase::CellType; +using hbase::Mutation; +using hbase::TimeUtil; + +const constexpr int64_t Mutation::kLatestTimestamp; + +TEST(Append, Row) { + Append append{"foo"}; + EXPECT_EQ("foo", append.row()); +} + +TEST(Append, Durability) { + Append append{"row"}; + EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability()); + + auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL; + append.SetDurability(skipWal); + EXPECT_EQ(skipWal, append.Durability()); +} + +TEST(Append, Timestamp) { + Append append{"row"}; + + // test default timestamp + EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp()); + + // set custom timestamp + auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos()); + append.SetTimeStamp(ts); + EXPECT_EQ(ts, append.TimeStamp()); + + // Add a column with custom timestamp + append.Add("f", "q", "v"); + auto &cell = append.FamilyMap().at("f")[0]; + EXPECT_EQ(ts, cell->Timestamp()); +} + +TEST(Append, HasFamilies) { + Append append{"row"}; + + EXPECT_EQ(false, append.HasFamilies()); + + append.Add("f", "q", "v"); + EXPECT_EQ(true, append.HasFamilies()); +} + +TEST(Append, Add) { + CellType cell_type = CellType::PUT; + std::string row = "row"; + std::string family = "family"; + std::string column = "column"; + std::string value = "value"; + int64_t timestamp = std::numeric_limits<int64_t>::max(); + auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + + // add first cell + Append append{"row"}; + append.Add(std::move(cell)); + EXPECT_EQ(1, append.FamilyMap().size()); + EXPECT_EQ(1, append.FamilyMap().at(family).size()); + + // add a non-matching row + auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type); + Append append2{"foo"}; + ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error); // rows don't match + + // add a second cell with same family + auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type); + append.Add(std::move(cell3)); + EXPECT_EQ(1, append.FamilyMap().size()); + EXPECT_EQ(2, append.FamilyMap().at(family).size()); + + // add a cell to a different family + auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type); + append.Add(std::move(cell4)); + EXPECT_EQ(2, append.FamilyMap().size()); + EXPECT_EQ(1, append.FamilyMap().at("family-2").size()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/test/async-batch-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/test/async-batch-rpc-retrying-test.cc b/hbase-native-client/src/test/async-batch-rpc-retrying-test.cc new file mode 100644 index 0000000..b0f4afb --- /dev/null +++ b/hbase-native-client/src/test/async-batch-rpc-retrying-test.cc @@ -0,0 +1,577 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <folly/Memory.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/ScopedEventBaseThread.h> +#include <gtest/gtest.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <chrono> +#include <functional> +#include <string> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-batch-rpc-retrying-caller.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/async-rpc-retrying-caller-factory.h" +#include "hbase/client/client.h" +#include "hbase/client/connection-configuration.h" +#include "hbase/client/keyvalue-codec.h" +#include "hbase/client/region-location.h" +#include "hbase/client/result.h" +#include "hbase/exceptions/exception.h" +#include "hbase/test-util/test-util.h" +#include "hbase/utils/time-util.h" + +using hbase::AsyncRpcRetryingCallerFactory; +using hbase::AsyncConnection; +using hbase::AsyncRegionLocator; +using hbase::ConnectionConfiguration; +using hbase::Configuration; +using hbase::HBaseRpcController; +using hbase::RegionLocation; +using hbase::RegionLocateType; +using hbase::RpcClient; +using hbase::RequestConverter; +using hbase::ResponseConverter; +using hbase::Put; +using hbase::TimeUtil; +using hbase::Client; +using hbase::security::User; + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +using namespace hbase; + +using folly::exception_wrapper; + +class AsyncBatchRpcRetryTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + static std::string tableName; + + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", + "test500", "test600", "test700", "test800", "test900"}; + tableName = "split-table1"; + test_util->CreateTable(tableName, "d", keys); + } +}; +std::unique_ptr<hbase::TestUtil> AsyncBatchRpcRetryTest::test_util = nullptr; +std::string AsyncBatchRpcRetryTest::tableName; + +class AsyncRegionLocatorBase : public AsyncRegionLocator { + public: + AsyncRegionLocatorBase() {} + explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location) + : region_location_(region_location) {} + virtual ~AsyncRegionLocatorBase() = default; + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &, + const std::string &row, + const RegionLocateType, + const int64_t) override { + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setValue(region_locations_.at(row)); + return promise.getFuture(); + } + + virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) { + region_location_ = region_location; + } + + virtual void set_region_location( + const std::map<std::string, std::shared_ptr<RegionLocation>> ®_locs) { + for (auto reg_loc : reg_locs) { + region_locations_[reg_loc.first] = reg_loc.second; + } + } + + void UpdateCachedLocation(const RegionLocation &rl, const folly::exception_wrapper &ew) override { + } + + protected: + std::shared_ptr<RegionLocation> region_location_; + std::map<std::string, std::shared_ptr<RegionLocation>> region_locations_; + std::map<std::string, uint32_t> mtries_; + std::map<std::string, uint32_t> mnum_fails_; + + void InitRetryMaps(uint32_t num_fails) { + if (mtries_.size() == 0 && mnum_fails_.size() == 0) { + for (auto reg_loc : region_locations_) { + mtries_[reg_loc.first] = 0; + mnum_fails_[reg_loc.first] = num_fails; + } + } + } +}; + +class MockAsyncRegionLocator : public AsyncRegionLocatorBase { + public: + MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} + explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockAsyncRegionLocator() {} +}; + +class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t counter_ = 0; + uint32_t num_fails_ = 0; + uint32_t tries_ = 0; + + public: + explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockWrongRegionAsyncRegionLocator() {} + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + InitRetryMaps(num_fails_); + auto &tries = mtries_[row]; + auto &num_fails = mnum_fails_[row]; + if (++tries > num_fails) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + + folly::Promise<std::shared_ptr<RegionLocation>> promise; + /* set random region name, simulating invalid region */ + auto result = std::make_shared<RegionLocation>("whatever-region-name", + region_locations_.at(row)->region_info(), + region_locations_.at(row)->server_name()); + promise.setValue(result); + return promise.getFuture(); + } +}; + +class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + uint32_t counter_ = 0; + + public: + explicit MockFailingAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockFailingAsyncRegionLocator() {} + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + InitRetryMaps(num_fails_); + auto &tries = mtries_[row]; + auto &num_fails = mnum_fails_[row]; + if (++tries > num_fails) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setException(std::runtime_error{"Failed to look up region location"}); + return promise.getFuture(); + } +}; + +class MockAsyncConnection : public AsyncConnection, + public std::enable_shared_from_this<MockAsyncConnection> { + public: + MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf, + std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor, + std::shared_ptr<RpcClient> rpc_client, + std::shared_ptr<AsyncRegionLocator> region_locator) + : conn_conf_(conn_conf), + retry_timer_(retry_timer), + cpu_executor_(cpu_executor), + io_executor_(io_executor), + retry_executor_(retry_executor), + rpc_client_(rpc_client), + region_locator_(region_locator) {} + ~MockAsyncConnection() {} + void Init() { + caller_factory_ = + std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); + } + + std::shared_ptr<Configuration> conf() override { return nullptr; } + std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; } + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { + return caller_factory_; + } + std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } + std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; } + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { + return retry_executor_; + } + + void Close() override { + retry_timer_->destroy(); + retry_executor_->stop(); + io_executor_->stop(); + cpu_executor_->stop(); + } + std::shared_ptr<HBaseRpcController> CreateRpcController() override { + return std::make_shared<HBaseRpcController>(); + } + + private: + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<ConnectionConfiguration> conn_conf_; + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; + std::shared_ptr<RpcClient> rpc_client_; + std::shared_ptr<AsyncRegionLocator> region_locator_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; +}; + +class MockRawAsyncTableImpl { + public: + explicit MockRawAsyncTableImpl(std::shared_ptr<MockAsyncConnection> conn, + std::shared_ptr<hbase::pb::TableName> tn) + : conn_(conn), tn_(tn) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + template <typename REQ, typename RESP> + folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) { + /* init request caller builder */ + auto builder = conn_->caller_factory()->Batch<REQ, RESP>(); + + /* call with retry to get result */ + auto async_caller = + builder->table(tn_) + ->actions(std::make_shared<std::vector<REQ>>(rows)) + ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn_->connection_conf()->operation_timeout()) + ->pause(conn_->connection_conf()->pause()) + ->max_attempts(conn_->connection_conf()->max_retries()) + ->start_log_errors_count(conn_->connection_conf()->start_log_errors_count()) + ->Build(); + + return async_caller->Call().then([async_caller](auto r) { return r; }); + } + + private: + std::shared_ptr<MockAsyncConnection> conn_; + std::shared_ptr<hbase::pb::TableName> tn_; +}; + +std::shared_ptr<MockAsyncConnection> getAsyncConnection( + Client &client, uint32_t operation_timeout_millis, uint32_t tries, + std::shared_ptr<AsyncRegionLocatorBase> region_locator) { + /* init region location and rpc channel */ + auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io_executor_ = client.async_connection()->io_executor(); + auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto codec = std::make_shared<hbase::KeyValueCodec>(); + auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec, + AsyncBatchRpcRetryTest::test_util->conf()); + std::shared_ptr<folly::HHWheelTimer> retry_timer = + folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); + + /* init connection configuration */ + auto connection_conf = std::make_shared<ConnectionConfiguration>( + TimeUtil::SecondsToNanos(20), // connect_timeout + TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout + TimeUtil::SecondsToNanos(60), // rpc_timeout + TimeUtil::MillisToNanos(100), // pause + tries, // max retries + 1); // start log errors count + + return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, + io_executor_, retry_executor_, rpc_client, + region_locator); +} + +template <typename ACTION> +std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) { + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto action : actions) { + std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action); + rows.push_back(srow); + } + return rows; +} + +template <typename REQ, typename RESP> +std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions, + std::vector<folly::Try<RESP>> &tresults) { + std::vector<std::shared_ptr<hbase::Result>> results{}; + uint64_t num = 0; + for (auto tresult : tresults) { + if (tresult.hasValue()) { + results.push_back(tresult.value()); + } else if (tresult.hasException()) { + folly::exception_wrapper ew = tresult.exception(); + LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " + << actions[num].row(); + throw ew; + } + ++num; + } + return results; +} + +template <typename ACTION> +std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions( + uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) { + std::map<std::string, std::shared_ptr<RegionLocation>> region_locations; + for (uint64_t i = 0; i < num_rows; ++i) { + auto row = "test" + std::to_string(i); + ACTION action(row); + actions.push_back(action); + region_locations[row] = table->GetRegionLocation(row); + } + return region_locations; +} + +void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator, + const std::string &table_name, bool split_regions, uint32_t tries = 3, + uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) { + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName); + + // Create a client + Client client(*AsyncBatchRpcRetryTest::test_util->conf()); + + // Get connection to HBase Table + std::shared_ptr<Table> table = client.Table(tn); + + for (uint64_t i = 0; i < num_rows; i++) { + table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), + "value" + std::to_string(i))); + } + std::vector<hbase::Get> gets; + auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table); + + /* set region locator */ + region_locator->set_region_location(region_locations); + + /* init hbase client connection */ + auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = + std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + + std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets); + auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( + milliseconds(operation_timeout_millis)); + ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; + + auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults); + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; + uint32_t i = 0; + for (; i < num_rows; ++i) { + ASSERT_TRUE(!results[i]->IsEmpty()) << "Result for Get " << gets[i].row() + << " must not be empty"; + EXPECT_EQ("test" + std::to_string(i), results[i]->Row()); + EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value()); + } + + table->Close(); + client.Close(); + conn->Close(); +} + +void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator, + const std::string &table_name, bool split_regions, uint32_t tries = 3, + uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) { + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(AsyncBatchRpcRetryTest::tableName); + + // Create a client + Client client(*AsyncBatchRpcRetryTest::test_util->conf()); + + // Get connection to HBase Table + std::shared_ptr<Table> table = client.Table(tn); + + std::vector<hbase::Put> puts; + auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table); + + /* set region locator */ + region_locator->set_region_location(region_locations); + + /* init hbase client connection */ + auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = + std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + + std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts); + auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( + milliseconds(operation_timeout_millis)); + ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; + + auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults); + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; + + table->Close(); + client.Close(); + conn->Close(); +} + +// Test successful case +TEST_F(AsyncBatchRpcRetryTest, MultiGets) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiGets(region_locator, "table1", false); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table2", false, 5); +} + +// Tests the RPC failing 4 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false)); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table4", false); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3)); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000)); +} + +////////////////////// +// Test successful case +TEST_F(AsyncBatchRpcRetryTest, MultiPuts) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiPuts(region_locator, "table1", false); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiPuts(region_locator, "table2", false, 5); +} + +// Tests the RPC failing 4 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false)); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiPuts(region_locator, "table4", false); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3)); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000)); +} + + // Test successful case + TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiGets(region_locator, "table7", true); + } + + // Tests the RPC failing 3 times, then succeeding + TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table8", true, 5); + } + + // Tests the RPC failing 4 times, throwing an exception + TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true)); + } + + // Tests the region location lookup failing 3 times, then succeeding + TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table10", true); + } + + // Tests the region location lookup failing 5 times, throwing an exception + TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3)); + } + + // Tests hitting operation timeout, thus not retrying anymore + TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000)); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/24f7f093/hbase-native-client/src/test/async-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/src/test/async-rpc-retrying-test.cc b/hbase-native-client/src/test/async-rpc-retrying-test.cc new file mode 100644 index 0000000..b590b43 --- /dev/null +++ b/hbase-native-client/src/test/async-rpc-retrying-test.cc @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <folly/Memory.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/ScopedEventBaseThread.h> +#include <gmock/gmock.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <chrono> +#include <functional> +#include <string> + +#include "hbase/connection/request.h" +#include "hbase/connection/response.h" +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/async-rpc-retrying-caller-factory.h" +#include "hbase/client/async-rpc-retrying-caller.h" +#include "hbase/client/client.h" +#include "hbase/client/connection-configuration.h" +#include "hbase/client/hbase-rpc-controller.h" +#include "hbase/client/keyvalue-codec.h" +#include "hbase/client/region-location.h" +#include "hbase/client/request-converter.h" +#include "hbase/client/response-converter.h" +#include "hbase/client/result.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" +#include "hbase/test-util/test-util.h" +#include "hbase/utils/time-util.h" + +using hbase::AsyncRpcRetryingCallerFactory; +using hbase::AsyncConnection; +using hbase::AsyncRegionLocator; +using hbase::ConnectionConfiguration; +using hbase::Configuration; +using hbase::HBaseRpcController; +using hbase::RegionLocation; +using hbase::RegionLocateType; +using hbase::RpcClient; +using hbase::RequestConverter; +using hbase::ResponseConverter; +using hbase::ReqConverter; +using hbase::RespConverter; +using hbase::Put; +using hbase::TimeUtil; +using hbase::Client; +using hbase::security::User; + +using ::testing::Return; +using ::testing::_; +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +using namespace hbase; + +using folly::exception_wrapper; + +class AsyncRpcRetryTest : public ::testing::Test { + public: + static std::unique_ptr<hbase::TestUtil> test_util; + + static void SetUpTestCase() { + google::InstallFailureSignalHandler(); + test_util = std::make_unique<hbase::TestUtil>(); + test_util->StartMiniCluster(2); + } +}; +std::unique_ptr<hbase::TestUtil> AsyncRpcRetryTest::test_util = nullptr; + +class AsyncRegionLocatorBase : public AsyncRegionLocator { + public: + AsyncRegionLocatorBase() {} + explicit AsyncRegionLocatorBase(std::shared_ptr<RegionLocation> region_location) + : region_location_(region_location) {} + virtual ~AsyncRegionLocatorBase() = default; + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion(const hbase::pb::TableName &, + const std::string &, + const RegionLocateType, + const int64_t) override { + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setValue(region_location_); + return promise.getFuture(); + } + + virtual void set_region_location(std::shared_ptr<RegionLocation> region_location) { + region_location_ = region_location; + } + + void UpdateCachedLocation(const RegionLocation &, const folly::exception_wrapper &) override {} + + protected: + std::shared_ptr<RegionLocation> region_location_; +}; + +class MockAsyncRegionLocator : public AsyncRegionLocatorBase { + public: + MockAsyncRegionLocator() : AsyncRegionLocatorBase() {} + explicit MockAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockAsyncRegionLocator() {} +}; + +class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + + public: + explicit MockWrongRegionAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockWrongRegionAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockWrongRegionAsyncRegionLocator() {} + + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + // Fail for num_fails_ times, then delegate to the super class which will give the correct + // region location. + if (tries_++ > num_fails_) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + folly::Promise<std::shared_ptr<RegionLocation>> promise; + /* set random region name, simulating invalid region */ + auto result = std::make_shared<RegionLocation>( + "whatever-region-name", region_location_->region_info(), region_location_->server_name()); + promise.setValue(result); + return promise.getFuture(); + } +}; + +class MockFailingAsyncRegionLocator : public AsyncRegionLocatorBase { + private: + uint32_t tries_ = 0; + uint32_t num_fails_ = 0; + + public: + explicit MockFailingAsyncRegionLocator(uint32_t num_fails) + : AsyncRegionLocatorBase(), num_fails_(num_fails) {} + explicit MockFailingAsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : AsyncRegionLocatorBase(region_location) {} + virtual ~MockFailingAsyncRegionLocator() {} + folly::Future<std::shared_ptr<hbase::RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) override { + // Fail for num_fails_ times, then delegate to the super class which will give the correct + // region location. + if (tries_++ > num_fails_) { + return AsyncRegionLocatorBase::LocateRegion(tn, row, locate_type, locate_ns); + } + folly::Promise<std::shared_ptr<RegionLocation>> promise; + promise.setException(std::runtime_error{"Failed to look up region location"}); + return promise.getFuture(); + } +}; + +class MockAsyncConnection : public AsyncConnection, + public std::enable_shared_from_this<MockAsyncConnection> { + public: + MockAsyncConnection(std::shared_ptr<ConnectionConfiguration> conn_conf, + std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor, + std::shared_ptr<RpcClient> rpc_client, + std::shared_ptr<AsyncRegionLocator> region_locator) + : conn_conf_(conn_conf), + retry_timer_(retry_timer), + cpu_executor_(cpu_executor), + io_executor_(io_executor), + retry_executor_(retry_executor), + rpc_client_(rpc_client), + region_locator_(region_locator) {} + ~MockAsyncConnection() {} + void Init() { + caller_factory_ = + std::make_shared<AsyncRpcRetryingCallerFactory>(shared_from_this(), retry_timer_); + } + + std::shared_ptr<Configuration> conf() override { return nullptr; } + std::shared_ptr<ConnectionConfiguration> connection_conf() override { return conn_conf_; } + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { + return caller_factory_; + } + std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } + std::shared_ptr<AsyncRegionLocator> region_locator() override { return region_locator_; } + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { + return retry_executor_; + } + + void Close() override {} + std::shared_ptr<HBaseRpcController> CreateRpcController() override { + return std::make_shared<HBaseRpcController>(); + } + + private: + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<ConnectionConfiguration> conn_conf_; + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; + std::shared_ptr<RpcClient> rpc_client_; + std::shared_ptr<AsyncRegionLocator> region_locator_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; +}; + +template <typename CONN> +class MockRawAsyncTableImpl { + public: + explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) : conn_(conn) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future<std::shared_ptr<hbase::Result>> GetCall( + std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const hbase::Get &get) { + hbase::RpcCall<hbase::Request, hbase::Response> rpc_call = []( + std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc, + std::shared_ptr<HBaseRpcController> controller, + std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> { + VLOG(1) << "entering MockRawAsyncTableImpl#GetCall, calling AsyncCall, loc:" + << loc->DebugString(); + return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(preq), User::defaultUser(), "ClientService"); + }; + + return Call<hbase::Get, hbase::Request, hbase::Response, std::shared_ptr<hbase::Result>>( + rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template <typename REQ, typename PREQ, typename PRESP, typename RESP> + folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client, + std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const REQ &req, + ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + hbase::RpcCall<PREQ, PRESP> rpc_call, + RespConverter<RESP, PRESP> resp_converter) { + promise_ = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>(); + auto f = promise_->getFuture(); + VLOG(1) << "calling rpc_call"; + rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) + .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) { + VLOG(1) << "MockRawAsyncTableImpl#call succeded: "; + RESP result = resp_converter(*presp); + promise_->setValue(result); + }) + .onError([this](const exception_wrapper &e) { + VLOG(1) << "entering MockRawAsyncTableImpl#call, exception: " << e.what(); + VLOG(1) << "entering MockRawAsyncTableImpl#call, error typeinfo: " << typeid(e).name(); + promise_->setException(e); + }); + return f; + } + + private: + std::shared_ptr<CONN> conn_; + std::shared_ptr<folly::Promise<std::shared_ptr<hbase::Result>>> promise_; +}; + +void runTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, std::string tableName, + uint32_t operation_timeout_millis = 1200000) { + AsyncRpcRetryTest::test_util->CreateTable(tableName, "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(tableName); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create a client + Client client(*(AsyncRpcRetryTest::test_util->conf())); + + // Get connection to HBase Table + auto table = client.Table(tn); + + table->Put(Put{"test2"}.AddColumn("d", "2", "value2")); + table->Put(Put{"test2"}.AddColumn("d", "extra", "value for extra")); + + /* init region location and rpc channel */ + auto region_location = table->GetRegionLocation(row); + + // auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(4); + auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); + auto io_executor_ = client.async_connection()->io_executor(); + auto retry_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto codec = std::make_shared<hbase::KeyValueCodec>(); + auto rpc_client = std::make_shared<RpcClient>(io_executor_, cpu_executor_, codec, + AsyncRpcRetryTest::test_util->conf()); + // auto retry_event_base_ = std::make_shared<folly::ScopedEventBaseThread>(true); + std::shared_ptr<folly::HHWheelTimer> retry_timer = + folly::HHWheelTimer::newTimer(retry_executor_->getEventBase()); + + /* init connection configuration */ + auto connection_conf = std::make_shared<ConnectionConfiguration>( + TimeUtil::SecondsToNanos(20), // connect_timeout + TimeUtil::MillisToNanos(operation_timeout_millis), // operation_timeout + TimeUtil::SecondsToNanos(60), // rpc_timeout + TimeUtil::MillisToNanos(100), // pause + 5, // max retries + 9); // start log errors count + + /* set region locator */ + region_locator->set_region_location(region_location); + + /* init hbase client connection */ + auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, + io_executor_, retry_executor_, rpc_client, + region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn); + + /* init request caller builder */ + auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>(); + + /* call with retry to get result */ + + auto async_caller = + builder->table(std::make_shared<hbase::pb::TableName>(tn)) + ->row(row) + ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn->connection_conf()->operation_timeout()) + ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) + -> folly::Future<std::shared_ptr<hbase::Result>> { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + auto promise = std::make_shared<folly::Promise<std::shared_ptr<hbase::Result>>>(); + + auto result = async_caller->Call().get(milliseconds(500000)); + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); + + retry_timer->destroy(); + table->Close(); + client.Close(); + retry_executor_->stop(); +} + +// Test successful case +TEST_F(AsyncRpcRetryTest, TestGetBasic) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runTest(region_locator, "table1"); +} + +// Tests the RPC failing 3 times, then succeeding +TEST_F(AsyncRpcRetryTest, TestHandleException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runTest(region_locator, "table2"); +} + +// Tests the RPC failing 5 times, throwing an exception +TEST_F(AsyncRpcRetryTest, TestFailWithException) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(5)); + EXPECT_ANY_THROW(runTest(region_locator, "table3")); +} + +// Tests the region location lookup failing 3 times, then succeeding +TEST_F(AsyncRpcRetryTest, TestHandleExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runTest(region_locator, "table4"); +} + +// Tests the region location lookup failing 5 times, throwing an exception +TEST_F(AsyncRpcRetryTest, TestFailWithExceptionFromRegionLocationLookup) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(5)); + EXPECT_ANY_THROW(runTest(region_locator, "table5")); +} + +// Tests hitting operation timeout, thus not retrying anymore +TEST_F(AsyncRpcRetryTest, TestFailWithOperationTimeout) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + EXPECT_ANY_THROW(runTest(region_locator, "table6", 200)); +}