http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK new file mode 100644 index 0000000..207607f --- /dev/null +++ b/hbase-native-client/serde/BUCK @@ -0,0 +1,54 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cxx_library(name="serde", + exported_headers=[ + "client-serializer.h", + "client-deserializer.h", + ], + srcs=[ + "client-serializer.cc", + "client-deserializer.cc", + ], + deps=[ + "//if:if", + "//third-party:folly", + ], + tests=[ + ":client-serializer-test", + ":client-deserializer-test", + ], + visibility=[ + 'PUBLIC', + ], ) + +cxx_test(name="client-serializer-test", + srcs=[ + "client-serializer-test.cc", + ], + deps=[ + ":serde", + "//if:if", + ], ) +cxx_test(name="client-deserializer-test", + srcs=[ + "client-deserializer-test.cc", + ], + deps=[ + ":serde", + "//if:if", + ], )
http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc new file mode 100644 index 0000000..bb57e50 --- /dev/null +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <gtest/gtest.h> +#include <folly/io/IOBuf.h> + +#include "serde/client-deserializer.h" +#include "serde/client-serializer.h" +#include "if/Client.pb.h" + +using namespace hbase; +using folly::IOBuf; +using hbase::pb::GetRequest; +using hbase::pb::RegionSpecifier; +using hbase::pb::RegionSpecifier_RegionSpecifierType; + +TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) { + ClientDeserializer deser; + ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0); +} + +TEST(TestClientDeserializer, TestReturnFalseOnBadInput) { + ClientDeserializer deser; + auto buf = IOBuf::copyBuffer("test"); + GetRequest gr; + + ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0); +} + +TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { + GetRequest in; + ClientSerializer ser; + ClientDeserializer deser; + + // fill up the GetRequest. + in.mutable_region()->set_value("test_region_id"); + in.mutable_region()->set_type( + RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME); + in.mutable_get()->set_row("test_row"); + + // Create the buffer + auto buf = ser.serialize_delimited(in); + + GetRequest out; + + int used_bytes = deser.parse_delimited(buf.get(), &out); + + ASSERT_GT(used_bytes, 0); + ASSERT_EQ(used_bytes, buf->length()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-deserializer.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc new file mode 100644 index 0000000..118b0d1 --- /dev/null +++ b/hbase-native-client/serde/client-deserializer.cc @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "serde/client-deserializer.h" + +#include <google/protobuf/message.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <folly/Logging.h> + +using namespace hbase; + +using folly::IOBuf; +using google::protobuf::Message; +using google::protobuf::io::ArrayInputStream; +using google::protobuf::io::CodedInputStream; + +int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) { + if (buf == nullptr || msg == nullptr) { + return -2; + } + + DCHECK(!buf->isChained()); + + ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; + CodedInputStream coded_stream{&ais}; + + uint32_t msg_size; + + // Try and read the varint. + if (coded_stream.ReadVarint32(&msg_size) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; + return -3; + } + + coded_stream.PushLimit(msg_size); + // Parse the message. + if (msg->MergeFromCodedStream(&coded_stream) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Unable to read a protobuf message from data."; + return -4; + } + + // Make sure all the data was consumed. + if (coded_stream.ConsumedEntireMessage() == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Orphaned data left after reading protobuf message"; + return -5; + } + + return coded_stream.CurrentPosition(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-deserializer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/client-deserializer.h new file mode 100644 index 0000000..b9664b0 --- /dev/null +++ b/hbase-native-client/serde/client-deserializer.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/io/IOBuf.h> + +// Forward +namespace google { +namespace protobuf { +class Message; +} +} + +namespace hbase { +class ClientDeserializer { +public: + int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg); +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-serializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc new file mode 100644 index 0000000..b32b55d --- /dev/null +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <gtest/gtest.h> + +#include <folly/io/Cursor.h> + +#include <string> + +#include "serde/client-serializer.h" +#include "if/HBase.pb.h" +#include "if/RPC.pb.h" + +using namespace hbase; +using namespace hbase::pb; +using namespace folly; +using namespace folly::io; + +TEST(ClientSerializerTest, PreambleIncludesHBas) { + ClientSerializer ser; + auto buf = ser.preamble(); + const char *p = reinterpret_cast<const char *>(buf->data()); + // Take the first for chars and make sure they are the + // magic string + EXPECT_EQ("HBas", std::string(p, 4)); + + EXPECT_EQ(6, buf->computeChainDataLength()); +} + +TEST(ClientSerializerTest, PreambleIncludesVersion) { + ClientSerializer ser; + auto buf = ser.preamble(); + EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]); + EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]); +} + +TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { + ClientSerializer ser; + auto header = ser.header("elliott"); + + // The header should be prefixed by 4 bytes of length. + EXPECT_EQ(4, header->length()); + EXPECT_TRUE(header->length() < header->computeChainDataLength()); + EXPECT_TRUE(header->isChained()); + + // Now make sure the length is correct. + Cursor cursor(header.get()); + auto prefixed_len = cursor.readBE<uint32_t>(); + EXPECT_EQ(prefixed_len, header->next()->length()); +} + +TEST(ClientSerializerTest, TestHeaderDecode) { + ClientSerializer ser; + auto buf = ser.header("elliott"); + auto header_buf = buf->next(); + ConnectionHeader h; + + EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length())); + EXPECT_EQ("elliott", h.user_info().effective_user()); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-serializer.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/client-serializer.cc new file mode 100644 index 0000000..881b6e4 --- /dev/null +++ b/hbase-native-client/serde/client-serializer.cc @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "serde/client-serializer.h" + +#include <folly/io/Cursor.h> +#include <folly/Logging.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +#include "if/HBase.pb.h" +#include "if/RPC.pb.h" + +using namespace hbase; + +using folly::IOBuf; +using folly::io::RWPrivateCursor; +using google::protobuf::Message; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::io::ZeroCopyOutputStream; +using std::string; +using std::unique_ptr; + +static const std::string PREAMBLE = "HBas"; +static const std::string INTERFACE = "ClientService"; +static const uint8_t RPC_VERSION = 0; +static const uint8_t DEFAULT_AUTH_TYPE = 80; + +ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {} + +unique_ptr<IOBuf> ClientSerializer::preamble() { + auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); + magic->append(2); + RWPrivateCursor c(magic.get()); + c.skip(4); + // Version + c.write(RPC_VERSION); + // Standard security aka Please don't lie to me. + c.write(auth_type_); + return magic; +} + +unique_ptr<IOBuf> ClientSerializer::header(const string &user) { + pb::ConnectionHeader h; + + // TODO(eclark): Make this not a total lie. + h.mutable_user_info()->set_effective_user(user); + // The service name that we want to talk to. + // + // Right now we're completely ignoring the service interface. + // That may or may not be the correct thing to do. + // It worked for a while with the java client; until it + // didn't. + h.set_service_name(INTERFACE); + return prepend_length(serialize_message(h)); +} + +unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id, + const string &method, + const Message *msg) { + pb::RequestHeader rq; + rq.set_method_name(method); + rq.set_call_id(call_id); + rq.set_request_param(msg != nullptr); + auto ser_header = serialize_delimited(rq); + if (msg != nullptr) { + auto ser_req = serialize_delimited(*msg); + ser_header->appendChain(std::move(ser_req)); + } + + return prepend_length(std::move(ser_header)); +} + +unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) { + // Java ints are 4 long. So create a buffer that large + auto len_buf = IOBuf::create(4); + // Then make those bytes visible. + len_buf->append(4); + + RWPrivateCursor c(len_buf.get()); + // Get the size of the data to be pushed out the network. + auto size = msg->computeChainDataLength(); + + // Write the length to this IOBuf. + c.writeBE(static_cast<uint32_t>(size)); + + // Then attach the origional to the back of len_buf + len_buf->appendChain(std::move(msg)); + return len_buf; +} + +unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) { + // Get the buffer size needed for just the message. + int msg_size = msg.ByteSize(); + int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; + + // Create a buffer big enough to hold the varint and the object. + auto buf = IOBuf::create(buf_size); + buf->append(buf_size); + + // Create the array output stream. + ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())}; + // Wrap the ArrayOuputStream in the coded output stream to allow writing + // Varint32 + CodedOutputStream cos{&aos}; + + // Write out the size. + cos.WriteVarint32(msg_size); + + // Now write the rest out. + // We're using the protobuf output streams here to keep track + // of where in the output array we are rather than IOBuf. + msg.SerializeWithCachedSizesToArray( + cos.GetDirectBufferForNBytesAndAdvance(msg_size)); + + // Return the buffer. + return buf; +} +// TODO(eclark): Make this 1 copy. +unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) { + auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); + return buf; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dc0f0175/hbase-native-client/serde/client-serializer.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/client-serializer.h new file mode 100644 index 0000000..685095d --- /dev/null +++ b/hbase-native-client/serde/client-serializer.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/io/IOBuf.h> +#include <string> +#include <cstdint> + +// Forward +namespace google { +namespace protobuf { +class Message; +} +} +namespace hbase { +class Request; +} + +namespace hbase { +class ClientSerializer { +public: + ClientSerializer(); + std::unique_ptr<folly::IOBuf> preamble(); + std::unique_ptr<folly::IOBuf> header(const std::string &user); + std::unique_ptr<folly::IOBuf> request(const uint32_t call_id, + const std::string &method, + const google::protobuf::Message *msg); + std::unique_ptr<folly::IOBuf> + serialize_delimited(const google::protobuf::Message &msg); + + std::unique_ptr<folly::IOBuf> + serialize_message(const google::protobuf::Message &msg); + + std::unique_ptr<folly::IOBuf> + prepend_length(std::unique_ptr<folly::IOBuf> msg); + + uint8_t auth_type_; +}; +} // namespace hbase