[07/50] [abbrv] hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d26996a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d26996a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d26996a Branch: refs/heads/HDFS-12996 Commit: 0d26996a10a45fbf88bb02a0b90a13d52a6c Parents: e45df4e Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: Hanisha Koneru Committed: Mon Mar 26 11:11:03 2018 -0700 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d26996a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@
[18/50] [abbrv] hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dd47cae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dd47cae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dd47cae Branch: refs/heads/HDFS-7240 Commit: 6dd47cae8655f92ab2ae2729c0c08738db598405 Parents: 7ebecae Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: James Clampffer Committed: Thu Mar 22 17:19:47 2018 -0400 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dd47cae/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@
[24/52] [abbrv] hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dd47cae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dd47cae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dd47cae Branch: refs/heads/trunk Commit: 6dd47cae8655f92ab2ae2729c0c08738db598405 Parents: 7ebecae Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: James Clampffer Committed: Thu Mar 22 17:19:47 2018 -0400 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dd47cae/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@ void
[24/52] [abbrv] hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dd47cae Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dd47cae Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dd47cae Branch: refs/heads/HDFS-8707 Commit: 6dd47cae8655f92ab2ae2729c0c08738db598405 Parents: 7ebecae Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: James Clampffer Committed: Thu Mar 22 17:19:47 2018 -0400 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dd47cae/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@
[22/50] [abbrv] hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0ae6b12c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0ae6b12c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0ae6b12c Branch: refs/heads/HDFS-8707 Commit: 0ae6b12c325c4f909315290c296f8f05fd814471 Parents: 4b86fcf Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: James Clampffer Committed: Thu Mar 22 16:20:45 2018 -0400 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ae6b12c/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); connect_timer_.cancel(); @@ -190,7 +191,7 @@
hadoop git commit: libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer
Repository: hadoop Updated Branches: refs/heads/HDFS-8707 467114d39 -> ba51b7cf9 libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba51b7cf Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba51b7cf Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba51b7cf Branch: refs/heads/HDFS-8707 Commit: ba51b7cf9040231553bc6d60a21d8adf8c85640d Parents: 467114d Author: JamesAuthored: Fri Oct 14 10:13:24 2016 -0400 Committer: James Committed: Fri Oct 14 10:13:24 2016 -0400 -- .../native/libhdfspp/lib/rpc/rpc_connection.h | 75 ++-- .../native/libhdfspp/tests/rpc_engine_test.cc | 6 +- 2 files changed, 41 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba51b7cf/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 869be40..a6a07c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -34,9 +34,11 @@ namespace hdfs { -template +template class RpcConnectionImpl : public RpcConnection { public: + MEMCHECKED_CLASS(RpcConnectionImpl); + RpcConnectionImpl(RpcEngine *engine); virtual ~RpcConnectionImpl() override; @@ -55,7 +57,7 @@ public: virtual void FlushPendingRequests() override; - NextLayer _layer() { return next_layer_; } + Socket _get_mutable_socket() { return socket_; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } @@ -63,35 +65,34 @@ public: const Options options_; ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; - NextLayer next_layer_; + Socket socket_; ::asio::deadline_timer connect_timer_; void ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint ); }; -template -RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) +template +RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()), + socket_(engine->io_service()), connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); } -template -RpcConnectionImpl::~RpcConnectionImpl() { +template +RpcConnectionImpl::~RpcConnectionImpl() { LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); - std::lock_guard state_lock(connection_state_lock_); if (pending_requests_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); if (requests_on_fly_.size() > 0) LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); } -template -void RpcConnectionImpl::Connect( +template +void RpcConnectionImpl::Connect( const std::vector<::asio::ip::tcp::endpoint> , const AuthInfo & auth_info, RpcCallback ) { @@ -109,8 +110,8 @@ void RpcConnectionImpl::Connect( this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF } -template -void RpcConnectionImpl::ConnectAndFlush( +template +void RpcConnectionImpl::ConnectAndFlush( const std::vector<::asio::ip::tcp::endpoint> ) { LOG_INFO(kRPC, << "ConnectAndFlush called"); @@ -139,7 +140,7 @@ void RpcConnectionImpl::ConnectAndFlush( current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code ) { ConnectComplete(ec, first_endpoint); }); @@ -155,9 +156,9 @@ void RpcConnectionImpl::ConnectAndFlush( }); } -template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { - auto shared_this = RpcConnectionImpl::shared_from_this(); +template +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code , const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard