http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
index 8e579a2..1dd43af 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h
@@ -26,6 +26,7 @@
 #include "common/logging.h"
 #include "common/util.h"
 #include "common/libhdfs_events_impl.h"
+#include "hdfspp/ioservice.h"
 
 #include <asio/connect.hpp>
 #include <asio/read.hpp>
@@ -76,8 +77,8 @@ template <class Socket>
 RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine)
     : RpcConnection(engine),
       options_(engine->options()),
-      socket_(engine->io_service()),
-      connect_timer_(engine->io_service())
+      socket_(engine->io_service()->GetRaw()),
+      connect_timer_(engine->io_service()->GetRaw())
 {
       LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << 
(void*)this);
 }
@@ -353,7 +354,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() {
                         OnSendCompleted(ec, size);
                       });
   } else {  // Nothing to send for this request, inform the handler immediately
-    ::asio::io_service *service = GetIoService();
+    std::shared_ptr<IoService> service = GetIoService();
     if(!service) {
       LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access 
null IoService");
       // No easy way to bail out of this context, but the only way to get here 
is when
@@ -361,7 +362,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() {
       return;
     }
 
-    service->post(
+    service->PostTask(
         // Never hold locks when calling a callback
         [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
     );

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 0ca7c6a..ad6c9b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -30,7 +30,7 @@ template <class T>
 using optional = std::experimental::optional<T>;
 
 
-RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
+RpcEngine::RpcEngine(std::shared_ptr<IoService> io_service, const Options 
&options,
                      const std::string &client_name, const std::string 
&user_name,
                      const char *protocol_name, int protocol_version)
     : io_service_(io_service),
@@ -40,7 +40,7 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const 
Options &options,
       protocol_name_(protocol_name),
       protocol_version_(protocol_version),
       call_id_(0),
-      retry_timer(*io_service),
+      retry_timer(io_service->GetRaw()),
       event_handlers_(std::make_shared<LibhdfsEvents>()),
       connect_canceled_(false)
 {
@@ -86,7 +86,7 @@ bool RpcEngine::CancelPendingConnect() {
 
 void RpcEngine::Shutdown() {
   LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
-  io_service_->post([this]() {
+  io_service_->PostLambda([this]() {
     std::lock_guard<std::mutex> state_lock(engine_state_lock_);
     conn_.reset();
   });
@@ -154,7 +154,7 @@ void RpcEngine::AsyncRpc(
 
   // In case user-side code isn't checking the status of Connect before doing 
RPC
   if(connect_canceled_) {
-    io_service_->post(
+    io_service_->PostLambda(
         [handler](){ handler(Status::Canceled()); }
     );
     return;
@@ -190,7 +190,7 @@ void RpcEngine::AsyncRpcCommsError(
     std::vector<std::shared_ptr<Request>> pendingRequests) {
   LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << 
status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << 
std::to_string(pendingRequests.size()));
 
-  io_service().post([this, status, failedConnection, pendingRequests]() {
+  io_service_->PostLambda([this, status, failedConnection, pendingRequests]() {
     RpcCommsError(status, failedConnection, pendingRequests);
   });
 }
@@ -238,7 +238,7 @@ void RpcEngine::RpcCommsError(
       //    on.  There might be a good argument for caching the first error
       //    rather than the last one, that gets messy
 
-      io_service().post([req, status]() {
+      io_service()->PostLambda([req, status]() {
         req->OnResponseArrived(nullptr, status);  // Never call back while 
holding a lock
       });
       it = pendingRequests.erase(it);
@@ -283,7 +283,7 @@ void RpcEngine::RpcCommsError(
 
           for(unsigned int i=0; i<pendingRequests.size(); i++) {
             std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i];
-            io_service().post([sharedCurrentRequest, badEndpointStatus]() {
+            io_service()->PostLambda([sharedCurrentRequest, 
badEndpointStatus]() {
               sharedCurrentRequest->OnResponseArrived(nullptr, 
badEndpointStatus);  // Never call back while holding a lock
             });
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
index 9f45fcf..845eaf5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h
@@ -60,6 +60,7 @@ class RpcConnection;
 class SaslProtocol;
 class RpcConnection;
 class Request;
+class IoService;
 
 /*
  * These methods of the RpcEngine will never acquire locks, and are safe for
@@ -83,7 +84,7 @@ public:
   virtual const std::string &user_name() = 0;
   virtual const std::string &protocol_name() = 0;
   virtual int protocol_version() = 0;
-  virtual ::asio::io_service &io_service() = 0;
+  virtual std::shared_ptr<IoService> io_service() const = 0;
   virtual const Options &options() = 0;
 };
 
@@ -107,7 +108,7 @@ class RpcEngine : public LockFreeRpcEngine, public 
std::enable_shared_from_this<
     kCallIdSasl = -33
   };
 
-  RpcEngine(::asio::io_service *io_service, const Options &options,
+  RpcEngine(std::shared_ptr<IoService> service, const Options &options,
             const std::string &client_name, const std::string &user_name,
             const char *protocol_name, int protocol_version);
 
@@ -145,7 +146,7 @@ class RpcEngine : public LockFreeRpcEngine, public 
std::enable_shared_from_this<
   const std::string &user_name() override { return auth_info_.getUser(); }
   const std::string &protocol_name() override { return protocol_name_; }
   int protocol_version() override { return protocol_version_; }
-  ::asio::io_service &io_service() override { return *io_service_; }
+  std::shared_ptr<IoService> io_service() const override { return io_service_; 
}
   const Options &options() override { return options_; }
   static std::string GetRandomClientName();
 
@@ -162,7 +163,7 @@ protected:
   std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
 
 private:
-  ::asio::io_service * const io_service_;
+  mutable std::shared_ptr<IoService> io_service_;
   const Options options_;
   const std::string client_name_;
   const std::string client_id_;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index 00bbf3d..23de015 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -16,11 +16,12 @@
  * limitations under the License.
  */
 
-#include "fs/filesystem.h"
-#include "fs/bad_datanode_tracker.h"
 #include "common/libhdfs_events_impl.h"
-
 #include "common/util.h"
+#include "fs/filesystem.h"
+#include "fs/filehandle.h"
+#include "fs/bad_datanode_tracker.h"
+#include "reader/block_reader.h"
 
 #include <gmock/gmock.h>
 
@@ -54,7 +55,7 @@ public:
     const std::string & client_name,
     const hadoop::hdfs::LocatedBlockProto &block,
     size_t offset,
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t)> handler));
 
   virtual void CancelOperation() override {
@@ -67,14 +68,14 @@ class MockDNConnection : public DataNodeConnection, public 
std::enable_shared_fr
       handler(Status::OK(), shared_from_this());
     }
 
-  void async_read_some(const MutableBuffers &buf,
+  void async_read_some(const MutableBuffer &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) 
override {
       (void)buf;
       handler(asio::error::fault, 0);
   }
 
-  void async_write_some(const ConstBuffers &buf,
+  void async_write_some(const ConstBuffer &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
       (void)buf;
@@ -101,7 +102,7 @@ protected:
     return mock_reader_;
   }
   std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
-      ::asio::io_service *io_service,
+      std::shared_ptr<IoService> io_service,
       const ::hadoop::hdfs::DatanodeInfoProto & dn,
       const hadoop::common::TokenProto * token) override {
     (void) io_service; (void) dn; (void) token;
@@ -130,12 +131,12 @@ TEST(BadDataNodeTest, TestNoNodes) {
   char buf[4096] = {
       0,
   };
-  IoServiceImpl io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
   auto monitors = std::make_shared<LibhdfsEvents>();
   bad_node_tracker->AddBadNode("foo");
 
-  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(), file_info, bad_node_tracker, monitors);
+  PartialMockFileHandle is("cluster", "file", io_service, 
GetRandomClientName(), file_info, bad_node_tracker, monitors);
   Status stat;
   size_t read = 0;
 
@@ -170,7 +171,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
   char buf[4096] = {
       0,
   };
-  IoServiceImpl io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   auto tracker = std::make_shared<BadDataNodeTracker>();
 
 
@@ -191,7 +192,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
 
     return event_response::make_ok();
   });
-  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
+  PartialMockFileHandle is("cluster", "file", io_service, 
GetRandomClientName(),  file_info, tracker, monitors);
   Status stat;
   size_t read = 0;
 
@@ -234,10 +235,10 @@ TEST(BadDataNodeTest, RecoverableError) {
   char buf[4096] = {
       0,
   };
-  IoServiceImpl io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   auto tracker = std::make_shared<BadDataNodeTracker>();
   auto monitors = std::make_shared<LibhdfsEvents>();
-  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
+  PartialMockFileHandle is("cluster", "file", io_service, 
GetRandomClientName(),  file_info, tracker, monitors);
   Status stat;
   size_t read = 0;
   EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
@@ -285,10 +286,10 @@ TEST(BadDataNodeTest, InternalError) {
   char buf[4096] = {
       0,
   };
-  IoServiceImpl io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   auto tracker = std::make_shared<BadDataNodeTracker>();
   auto monitors = std::make_shared<LibhdfsEvents>();
-  PartialMockFileHandle is("cluster", "file", &io_service.io_service(), 
GetRandomClientName(),  file_info, tracker, monitors);
+  PartialMockFileHandle is("cluster", "file", io_service, 
GetRandomClientName(),  file_info, tracker, monitors);
   Status stat;
   size_t read = 0;
   EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc
index 5ee9789..2fdbd80 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ioservice_test.cc
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-#include "common/hdfs_ioservice.h"
+#include "hdfspp/ioservice.h"
 
 #include <future>
 #include <functional>
 #include <thread>
 #include <string>
 
+
+#include <google/protobuf/stubs/common.h>
 #include <gmock/gmock.h>
 
 using ::testing::_;
@@ -34,7 +36,7 @@ using namespace hdfs;
 // Make sure IoService spins up specified number of threads
 TEST(IoServiceTest, InitThreads) {
 #ifndef DISABLE_CONCURRENT_WORKERS
-  std::shared_ptr<IoServiceImpl> service = 
std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared());
+  std::shared_ptr<IoService> service = IoService::MakeShared();
   EXPECT_NE(service, nullptr);
 
   unsigned int thread_count = 4;
@@ -50,7 +52,7 @@ TEST(IoServiceTest, InitThreads) {
 // Make sure IoService defaults to logical thread count
 TEST(IoServiceTest, InitDefaultThreads) {
 #ifndef DISABLE_CONCURRENT_WORKERS
-  std::shared_ptr<IoServiceImpl> service = 
std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared());
+  std::shared_ptr<IoService> service = IoService::MakeShared();
   EXPECT_NE(service, nullptr);
 
   unsigned int thread_count = std::thread::hardware_concurrency();
@@ -66,7 +68,7 @@ TEST(IoServiceTest, InitDefaultThreads) {
 
 // Check IoService::PostTask
 TEST(IoServiceTest, SimplePost) {
-  std::shared_ptr<IoServiceImpl> service = 
std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared());
+  std::shared_ptr<IoService> service = IoService::MakeShared();
   EXPECT_NE(service, nullptr);
 
   unsigned int thread_count = std::thread::hardware_concurrency();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
index cd1fc12..de234ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -49,7 +49,7 @@ public:
   virtual ~MockConnectionBase();
   typedef std::pair<asio::error_code, std::string> ProducerResult;
 
-  void async_read_some(const MutableBuffers &buf,
+  void async_read_some(const MutableBuffer &buf,
           std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
     if (produced_.size() == 0) {
@@ -72,7 +72,7 @@ public:
     io_service_->post(std::bind(handler, asio::error_code(), len));
   }
 
-  void async_write_some(const ConstBuffers &buf,
+  void async_write_some(const ConstBuffer &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
     // CompletionResult res = OnWrite(buf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index 80127f3..4b909b2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -69,14 +69,14 @@ public:
   /* event handler to trigger side effects */
   std::function<void(void)> OnRead;
 
-  void async_read_some(const MutableBuffers &buf,
+  void async_read_some(const MutableBuffer &buf,
         std::function<void (const asio::error_code & error,
                                std::size_t bytes_transferred) > handler) 
override {
       this->OnRead();
       this->MockConnectionBase::async_read_some(buf, handler);
   }
 
-  void async_write_some(const ConstBuffers &buf,
+  void async_write_some(const ConstBuffer &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
     this->MockConnectionBase::async_write_some(buf, handler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
index f998c7f..6bbe725 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+#include "hdfspp/ioservice.h"
+
 #include "mock_connection.h"
 #include "test.pb.h"
 #include "RpcHeader.pb.h"
@@ -23,7 +25,6 @@
 #include "common/namenode_info.h"
 
 #include <google/protobuf/io/coded_stream.h>
-
 #include <gmock/gmock.h>
 
 using ::hadoop::common::RpcResponseHeaderProto;
@@ -104,9 +105,10 @@ static inline std::pair<error_code, string> RpcResponse(
 using namespace hdfs;
 
 TEST(RpcEngineTest, TestRoundTrip) {
-  ::asio::io_service io_service;
+
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   Options options;
-  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, 
options, "foo", "", "protocol", 1);
+  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, 
options, "foo", "", "protocol", 1);
   auto conn =
       std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
   conn->TEST_set_connected(true);
@@ -129,20 +131,20 @@ TEST(RpcEngineTest, TestRoundTrip) {
   EchoRequestProto req;
   req.set_message("foo");
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
-  engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const 
Status &stat) {
+  engine->AsyncRpc("test", &req, resp, [resp, &complete,io_service](const 
Status &stat) {
     ASSERT_TRUE(stat.ok());
     ASSERT_EQ("foo", resp->message());
     complete = true;
-    io_service.stop();
+    io_service->Stop();
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
 TEST(RpcEngineTest, TestConnectionResetAndFail) {
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   Options options;
-  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, 
options, "foo", "", "protocol", 1);
+  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, 
options, "foo", "", "protocol", 1);
   auto conn =
       std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
   conn->TEST_set_connected(true);
@@ -164,23 +166,23 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
   req.set_message("foo");
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
 
-  engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status 
&stat) {
+  engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_FALSE(stat.ok());
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
 
 TEST(RpcEngineTest, TestConnectionResetAndRecover) {
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 0;
   std::shared_ptr<SharedConnectionEngine> engine
-      = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", 
"", "protocol", 1);
 
   // Normally determined during RpcEngine::Connect, but in this case options
   // provides enough info to determine policy here.
@@ -206,22 +208,22 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
   req.set_message("foo");
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
 
-  engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status 
&stat) {
+  engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_TRUE(stat.ok());
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
 TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   Options options;
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 1;
   std::shared_ptr<SharedConnectionEngine> engine =
-      std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", 
"protocol", 1);
 
   // Normally determined during RpcEngine::Connect, but in this case options
   // provides enough info to determine policy here.
@@ -246,17 +248,17 @@ TEST(RpcEngineTest, 
TestConnectionResetAndRecoverWithDelay) {
   req.set_message("foo");
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
 
-  engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status 
&stat) {
+  engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_TRUE(stat.ok());
   });
 
-  ::asio::deadline_timer timer(io_service);
+  ::asio::deadline_timer timer(io_service->GetRaw());
   timer.expires_from_now(std::chrono::hours(100));
   timer.async_wait([](const asio::error_code & err){(void)err; 
ASSERT_FALSE("Timed out"); });
 
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
@@ -267,7 +269,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
   SharedMockConnection::SetSharedConnectionData(producer);
 
   // Error and no retry
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
 
   bool complete = false;
 
@@ -275,16 +277,16 @@ TEST(RpcEngineTest, TestConnectionFailure)
   options.max_rpc_retries = 0;
   options.rpc_retry_delay_ms = 0;
   std::shared_ptr<SharedConnectionEngine> engine
-      = std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", 
"", "protocol", 1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")));
 
-  engine->Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
+  engine->Connect("", make_endpoint(), [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_FALSE(stat.ok());
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
@@ -294,7 +296,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
   producer->checkProducerForConnect = true;
   SharedMockConnection::SetSharedConnectionData(producer);
 
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
 
   bool complete = false;
 
@@ -302,18 +304,18 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
   options.max_rpc_retries = 2;
   options.rpc_retry_delay_ms = 0;
   std::shared_ptr<SharedConnectionEngine> engine =
-      std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", 
"protocol", 1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")));
 
-  engine->Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
+  engine->Connect("", make_endpoint(), [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_FALSE(stat.ok());
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
@@ -323,7 +325,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
   producer->checkProducerForConnect = true;
   SharedMockConnection::SetSharedConnectionData(producer);
 
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
 
   bool complete = false;
 
@@ -331,29 +333,30 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 0;
   std::shared_ptr<SharedConnectionEngine> engine =
-      std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", 
"protocol", 1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine->Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
+  engine->Connect("", make_endpoint(), [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_TRUE(stat.ok());
   });
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
 TEST(RpcEngineTest, TestEventCallbacks)
 {
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
+
   Options options;
   options.max_rpc_retries = 99;
   options.rpc_retry_delay_ms = 0;
   std::shared_ptr<SharedConnectionEngine> engine =
-      std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", 
"protocol", 1);
 
   // Normally determined during RpcEngine::Connect, but in this case options
   // provides enough info to determine policy here.
@@ -399,17 +402,18 @@ TEST(RpcEngineTest, TestEventCallbacks)
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
 
   bool complete = false;
-  engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status 
&stat) {
+  engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_TRUE(stat.ok());
   });
 
   // If you're adding event hooks you'll most likely need to update this.
   // It's a brittle test but makes it hard to miss control flow changes in RPC 
retry.
-  for(const auto& m : callbacks)
+  for(const auto& m : callbacks) {
     std::cerr << m << std::endl;
-  io_service.run();
+  }
+  io_service->Run();
   ASSERT_TRUE(complete);
   ASSERT_EQ(9, callbacks.size());
   ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
@@ -430,7 +434,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
   producer->checkProducerForConnect = true;
   SharedMockConnection::SetSharedConnectionData(producer);
 
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
 
   bool complete = false;
 
@@ -438,31 +442,31 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
   options.max_rpc_retries = 1;
   options.rpc_retry_delay_ms = 1;
   std::shared_ptr<SharedConnectionEngine> engine =
-      std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", 
"", "protocol", 1);
+      std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", 
"protocol", 1);
   EXPECT_CALL(*producer, Produce())
       
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset),
 "")))
       .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
       .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
 
-  engine->Connect("", make_endpoint(), [&complete, &io_service](const Status 
&stat) {
+  engine->Connect("", make_endpoint(), [&complete, io_service](const Status 
&stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_TRUE(stat.ok());
   });
 
-  ::asio::deadline_timer timer(io_service);
+  ::asio::deadline_timer timer(io_service->GetRaw());
   timer.expires_from_now(std::chrono::hours(100));
   timer.async_wait([](const asio::error_code & err){(void)err; 
ASSERT_FALSE("Timed out"); });
 
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 
 TEST(RpcEngineTest, TestTimeout) {
-  ::asio::io_service io_service;
+  std::shared_ptr<IoService> io_service = IoService::MakeShared();
   Options options;
   options.rpc_timeout = 1;
-  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, 
options, "foo", "", "protocol", 1);
+  std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, 
options, "foo", "", "protocol", 1);
   auto conn =
       std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
   conn->TEST_set_connected(true);
@@ -481,15 +485,15 @@ TEST(RpcEngineTest, TestTimeout) {
   std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
   engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const 
Status &stat) {
     complete = true;
-    io_service.stop();
+    io_service->Stop();
     ASSERT_FALSE(stat.ok());
   });
 
-  ::asio::deadline_timer timer(io_service);
+  ::asio::deadline_timer timer(io_service->GetRaw());
   timer.expires_from_now(std::chrono::hours(100));
   timer.async_wait([](const asio::error_code & err){(void)err; 
ASSERT_FALSE("Timed out"); });
 
-  io_service.run();
+  io_service->Run();
   ASSERT_TRUE(complete);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to