HDFS-13403: libhdfs++ Use hdfs::IoService object rather than asio::io_service.  
Contributed by James Clampffer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eefe2a14
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eefe2a14
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eefe2a14

Branch: refs/heads/HDFS-7240
Commit: eefe2a147c83dbb62c4021b67d59d3b9f065f890
Parents: 7eb783e
Author: James Clampffer <j...@apache.org>
Authored: Wed Apr 11 10:27:23 2018 -0400
Committer: James Clampffer <j...@apache.org>
Committed: Wed Apr 11 10:27:23 2018 -0400

----------------------------------------------------------------------
 .../native/libhdfspp/include/hdfspp/hdfspp.h    |  53 +------
 .../native/libhdfspp/include/hdfspp/ioservice.h | 140 ++++++++++++++++
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |   7 +-
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   2 +-
 .../native/libhdfspp/lib/common/async_stream.h  |  13 +-
 .../libhdfspp/lib/common/continuation/asio.h    |   5 -
 .../libhdfspp/lib/common/hdfs_ioservice.cc      | 146 -----------------
 .../libhdfspp/lib/common/hdfs_ioservice.h       |  79 ---------
 .../libhdfspp/lib/common/ioservice_impl.cc      | 159 +++++++++++++++++++
 .../libhdfspp/lib/common/ioservice_impl.h       |  76 +++++++++
 .../main/native/libhdfspp/lib/common/logging.h  |   3 -
 .../libhdfspp/lib/common/namenode_info.cc       |  15 +-
 .../native/libhdfspp/lib/common/namenode_info.h |   8 +-
 .../main/native/libhdfspp/lib/common/util.cc    |  14 +-
 .../src/main/native/libhdfspp/lib/common/util.h |  25 ++-
 .../lib/connection/datanodeconnection.cc        |  27 +++-
 .../lib/connection/datanodeconnection.h         |  26 +--
 .../main/native/libhdfspp/lib/fs/filehandle.cc  |  18 +--
 .../main/native/libhdfspp/lib/fs/filehandle.h   |  12 +-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  67 ++++++--
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  66 ++------
 .../libhdfspp/lib/fs/namenode_operations.h      |   4 +-
 .../native/libhdfspp/lib/reader/block_reader.cc |  18 +--
 .../native/libhdfspp/lib/reader/block_reader.h  |  10 +-
 .../native/libhdfspp/lib/reader/datatransfer.h  |   4 +-
 .../libhdfspp/lib/rpc/namenode_tracker.cc       |   2 +-
 .../native/libhdfspp/lib/rpc/namenode_tracker.h |   4 +-
 .../main/native/libhdfspp/lib/rpc/request.cc    |   5 +-
 .../native/libhdfspp/lib/rpc/rpc_connection.h   |   2 +-
 .../libhdfspp/lib/rpc/rpc_connection_impl.cc    |  32 ++--
 .../libhdfspp/lib/rpc/rpc_connection_impl.h     |   9 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.cc |  14 +-
 .../main/native/libhdfspp/lib/rpc/rpc_engine.h  |   9 +-
 .../native/libhdfspp/tests/bad_datanode_test.cc |  31 ++--
 .../libhdfspp/tests/hdfs_ioservice_test.cc      |  10 +-
 .../native/libhdfspp/tests/mock_connection.h    |   4 +-
 .../libhdfspp/tests/remote_block_reader_test.cc |   4 +-
 .../native/libhdfspp/tests/rpc_engine_test.cc   | 112 ++++++-------
 38 files changed, 681 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
index 2fdeec9..e68a612 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h
@@ -19,6 +19,7 @@
 #define LIBHDFSPP_HDFSPP_H_
 
 #include "hdfspp/options.h"
+#include "hdfspp/ioservice.h"
 #include "hdfspp/status.h"
 #include "hdfspp/events.h"
 #include "hdfspp/block_location.h"
@@ -31,62 +32,10 @@
 
 #include <functional>
 #include <memory>
-#include <set>
-#include <iostream>
 
 namespace hdfs {
 
 /**
- * An IoService manages a queue of asynchronous tasks. All libhdfs++
- * operations are filed against a particular IoService.
- *
- * When an operation is queued into an IoService, the IoService will
- * run the callback handler associated with the operation. Note that
- * the IoService must be stopped before destructing the objects that
- * post the operations.
- *
- * From an implementation point of view the hdfs::IoService provides
- * a thin wrapper over an asio::io_service object so that additional
- * instrumentation and functionality can be added.
- **/
-
-class IoService : public std::enable_shared_from_this<IoService>
-{
- public:
-  static IoService *New();
-  static std::shared_ptr<IoService> MakeShared();
-  virtual ~IoService();
-
-  /**
-   * Start up as many threads as there are logical processors.
-   * Return number of threads created.
-   **/
-  virtual unsigned int InitDefaultWorkers() = 0;
-
-  /**
-   * Initialize with thread_count handler threads.
-   * If thread count is less than one print a log message and default to one 
thread.
-   * Return number of threads created.
-   **/
-  virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
-
-  /**
-   * Place an item on the execution queue.  Will be invoked from outside of 
the calling context.
-   **/
-  virtual void PostTask(std::function<void(void)>& asyncTask) = 0;
-
-  /**
-   * Run the asynchronous tasks associated with this IoService.
-   **/
-  virtual void Run() = 0;
-  /**
-   * Stop running asynchronous tasks associated with this IoService.
-   * All worker threads will return as soon as they finish executing their 
current task.
-   **/
-  virtual void Stop() = 0;
-};
-
-/**
  * A node exclusion rule provides a simple way of testing if the
  * client should attempt to connect to a node based on the node's
  * UUID.  The FileSystem and FileHandle use the BadDataNodeTracker

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h
new file mode 100644
index 0000000..9805bad
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * An asio::io_service maintains a queue of asynchronous tasks and invokes them
+ * when they are ready to run.  Async network IO handlers become runnable when
+ * the associated IO operation has completed.  The hdfs::IoService is a thin
+ * wrapper over that object to make it easier to add logging and 
instrumentation
+ * to tasks that have been queued.
+ *
+ * Lifecycle management:
+ *   -The IoService *shall* outlive any tasks it owns.  Deleting a task
+ *    before it has been run **will** result in dangling reference issues.
+ *   -Dependencies (including transitive dependencies) of pending tasks
+ *    *shall* outlive the task.  Failure to ensure this **will** result in
+ *    danging reference issues.
+ *     -libhdfs++ uses shared_ptr/weak_ptr heavily as a mechanism to ensure
+ *      liveness of dependencies.
+ *     -refcounted pointers in lambda capture lists have a poor track record
+ *      for ensuring liveness in this library; it's easy to omit them because
+ *      the capture list isn't context aware.  Developers are encouraged to
+ *      write callable classes that explicitly list dependencies.
+ *
+ * Constraints on tasks:
+ *   -Tasks and async callbacks *shall* never do blocking IO or sleep().
+ *    At best this hurts performance by preventing worker threads from doing
+ *    useful work.  It may also cause situations that look like deadlocks
+ *    if the worker thread is stalled for long enough.
+ *   -Tasks and async callbacks *shall* not acquire locks that guard resources
+ *    that might be unavailable for an unknown amount of time.  Lock 
acquisition
+ *    when accessing shared data structures is acceptable and is often 
required.
+ *   -Tasks and async callbacks *should* not allow exceptions to escape their
+ *    scope since tasks will be executed on a different stack then where they
+ *    were created.  The exception will be caught by the IoService rather than
+ *    being forwarded to the next task.
+ *   -Tasks and async callbacks *should* not rely on thread local storage for
+ *    ancillary context.  The IoService does not support any sort of thread
+ *    affinity that would guarantee tasks Post()ed from one thread will always
+ *    be executed on the same thread.  Applications that only use a single
+ *    worker thread may use TLS but developers should be mindful that 
throughput
+ *    can no longer be scaled by adding threads.
+ **/
+#ifndef INCLUDE_HDFSPP_IOSERVICE_H_
+#define INCLUDE_HDFSPP_IOSERVICE_H_
+
+#include <memory>
+
+// forward decl
+namespace asio {
+  class io_service;
+}
+
+namespace hdfs {
+
+// (Un)comment this to determine if issues are due to concurrency or logic 
faults
+// If tests still fail with concurrency disabled it's most likely a logic bug
+#define DISABLE_CONCURRENT_WORKERS
+
+class IoService : public std::enable_shared_from_this<IoService>
+{
+ public:
+  static IoService *New();
+  static std::shared_ptr<IoService> MakeShared();
+  virtual ~IoService();
+
+  /**
+   * Start up as many threads as there are logical processors.
+   * Return number of threads created.
+   **/
+  virtual unsigned int InitDefaultWorkers() = 0;
+
+  /**
+   * Initialize with thread_count handler threads.
+   * If thread count is less than one print a log message and default to one 
thread.
+   * Return number of threads created.
+   **/
+  virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
+
+  /**
+   * Add a worker thread to existing pool.
+   * Return true on success, false otherwise.
+   **/
+  virtual bool AddWorkerThread() = 0;
+
+  /**
+   * Return the number of worker threads in use.
+   **/
+  virtual unsigned int GetWorkerThreadCount() = 0;
+
+  /**
+   * Enqueue an item for deferred execution.  Non-blocking.
+   * Task will be invoked from outside of the calling context.
+   **/
+  virtual void PostTask(std::function<void(void)> asyncTask) = 0;
+
+  /**
+   * Provide type erasure for lambdas defined inside the argument list.
+   **/
+  template <typename LambdaInstance>
+  inline void PostLambda(LambdaInstance&& func)
+  {
+    std::function<void(void)> typeEraser = func;
+    this->PostTask(func);
+  }
+
+  /**
+   * Run the asynchronous tasks associated with this IoService.
+   **/
+  virtual void Run() = 0;
+  /**
+   * Stop running asynchronous tasks associated with this IoService.
+   * All worker threads will return as soon as they finish executing their 
current task.
+   **/
+  virtual void Stop() = 0;
+
+  /**
+   * Access underlying io_service object.  Only to be used in asio library 
calls.
+   * After HDFS-11884 is complete only tests should need direct access to the 
asio::io_service.
+   **/
+  virtual asio::io_service& GetRaw() = 0;
+};
+
+
+} // namespace hdfs
+#endif // include guard

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 9a7c8b4..6b2468f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -17,18 +17,17 @@
  */
 
 #include "hdfspp/hdfspp.h"
+#include "hdfspp/hdfs_ext.h"
 
-#include "fs/filesystem.h"
 #include "common/hdfs_configuration.h"
 #include "common/configuration_loader.h"
 #include "common/logging.h"
+#include "fs/filesystem.h"
+#include "fs/filehandle.h"
 
-#include <hdfs/hdfs.h>
-#include <hdfspp/hdfs_ext.h>
 
 #include <libgen.h>
 #include "limits.h"
-
 #include <string>
 #include <cstring>
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 5d9e52c..1ab04d3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -19,6 +19,6 @@ if(NEED_LINK_DL)
    set(LIB_DL dl)
 endif()
 
-add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc 
options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc 
uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc 
libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc 
content_summary.cc locks.cc config_parser.cc)
+add_library(common_obj OBJECT status.cc sasl_digest_md5.cc ioservice_impl.cc 
options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc 
uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc 
libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc 
content_summary.cc locks.cc config_parser.cc)
 add_library(common $<TARGET_OBJECTS:common_obj> 
$<TARGET_OBJECTS:uriparser2_obj>)
 target_link_libraries(common ${LIB_DL})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
index 575904c..efe2e1c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
@@ -19,12 +19,15 @@
 #ifndef LIB_COMMON_ASYNC_STREAM_H_
 #define LIB_COMMON_ASYNC_STREAM_H_
 
-#include <asio.hpp>
+#include <asio/buffer.hpp>
+#include <asio/error_code.hpp>
+#include <functional>
 
 namespace hdfs {
 
-typedef asio::mutable_buffers_1 MutableBuffers;
-typedef asio::const_buffers_1   ConstBuffers;
+// Contiguous buffer types
+typedef asio::mutable_buffers_1 MutableBuffer;
+typedef asio::const_buffers_1   ConstBuffer;
 
 /*
  * asio-compatible stream implementation.
@@ -35,11 +38,11 @@ typedef asio::const_buffers_1   ConstBuffers;
  */
 class AsyncStream  {
 public:
-  virtual void async_read_some(const MutableBuffers &buf,
+  virtual void async_read_some(const MutableBuffer &buf,
           std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) = 0;
 
-  virtual void async_write_some(const ConstBuffers &buf,
+  virtual void async_write_some(const ConstBuffer &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) = 0;
 };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
index 193358f..0215176 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -20,13 +20,8 @@
 
 #include "continuation.h"
 #include "common/util.h"
-
 #include "hdfspp/status.h"
-
-#include <asio/connect.hpp>
-#include <asio/read.hpp>
 #include <asio/write.hpp>
-#include <asio/ip/tcp.hpp>
 #include <memory>
 
 namespace hdfs {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
deleted file mode 100644
index 578b782..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "hdfs_ioservice.h"
-
-#include <thread>
-#include <mutex>
-#include <vector>
-
-#include "common/logging.h"
-
-namespace hdfs {
-
-IoService::~IoService() {}
-
-IoService *IoService::New() {
-  return new IoServiceImpl();
-}
-
-std::shared_ptr<IoService> IoService::MakeShared() {
-  return std::make_shared<IoServiceImpl>();
-}
-
-
-unsigned int IoServiceImpl::InitDefaultWorkers() {
-  LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << 
" called.");
-  unsigned int logical_thread_count = std::thread::hardware_concurrency();
-#ifndef DISABLE_CONCURRENT_WORKERS
-  if(logical_thread_count < 1) {
-    LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not 
detect any logical processors.  Defaulting to 1 worker thread.");
-  } else {
-    LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << 
logical_thread_count << " logical threads and will spawn a worker for each.");
-  }
-#else
-  if(logical_thread_count > 0) {
-    LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << 
logical_thread_count << " threads available.  Concurrent workers are disabled 
so 1 worker thread will be used");
-  }
-  logical_thread_count = 1;
-#endif
-  return InitWorkers(logical_thread_count);
-}
-
-unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) {
-#ifdef DISABLED_CONCURRENT_WORKERS
-  LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count 
<< " threads specified but concurrent workers are disabled so 1 will be used");
-  thread_count = 1;
-#endif
-  unsigned int created_threads = 0;
-  for(unsigned int i=0; i<thread_count; i++) {
-    bool created = AddWorkerThread();
-    if(created) {
-      created_threads++;
-    } else {
-      LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
failed to create a worker thread");
-    }
-  }
-  if(created_threads != thread_count) {
-    LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
attempted to create "
-                            << thread_count << " but only created " << 
created_threads
-                            << " worker threads.  Make sure this process has 
adequate resources.");
-  }
-  return created_threads;
-}
-
-bool IoServiceImpl::AddWorkerThread() {
-  mutex_guard state_lock(state_lock_);
-  auto async_worker = [this]() {
-    this->ThreadStartHook();
-    this->Run();
-    this->ThreadExitHook();
-  };
-  worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) );
-  return true;
-}
-
-
-void IoServiceImpl::ThreadStartHook() {
-  mutex_guard state_lock(state_lock_);
-  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " starting");
-}
-
-void IoServiceImpl::ThreadExitHook() {
-  mutex_guard state_lock(state_lock_);
-  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " exiting");
-}
-
-void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) {
-  io_service_.post(asyncTask);
-}
-
-void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) {
-  // It is far too easy to destroy the filesystem (and thus the threadpool)
-  //     from within one of the worker threads, leading to a deadlock.  Let's
-  //     provide some explicit protection.
-  if(t->get_id() == std::this_thread::get_id()) {
-    LOG_ERROR(kAsyncRuntime, << 
"FileSystemImpl::WorkerDeleter::operator(treadptr="
-                             << t << ") : FATAL: Attempted to destroy a thread 
pool"
-                             "from within a callback of the thread pool!");
-  }
-  t->join();
-  delete t;
-}
-
-// As long as this just forwards to an asio::io_service method it doesn't need 
a lock
-void IoServiceImpl::Run() {
-  // The IoService executes callbacks provided by library users in the context 
of worker threads,
-  // there is no way of preventing those callbacks from throwing but we can at 
least prevent them
-  // from escaping this library and crashing the process.
-
-  // As recommended in 
http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
-  asio::io_service::work work(io_service_);
-  while(true)
-  {
-    try
-    {
-      io_service_.run();
-      break;
-    } catch (const std::exception & e) {
-      LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker 
thread: " << e.what());
-    } catch (...) {
-      LOG_WARN(kFileSystem, << "Unexpected value not derived from 
std::exception in libhdfspp worker thread");
-    }
-  }
-}
-
-unsigned int IoServiceImpl::get_worker_thread_count() {
-  mutex_guard state_lock(state_lock_);
-  return worker_threads_.size();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
deleted file mode 100644
index 294252b..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef COMMON_HDFS_IOSERVICE_H_
-#define COMMON_HDFS_IOSERVICE_H_
-
-#include "hdfspp/hdfspp.h"
-
-#include <asio/io_service.hpp>
-#include "common/util.h"
-
-#include <mutex>
-#include <thread>
-
-namespace hdfs {
-
-// Uncomment this to determine if issues are due to concurrency or logic faults
-// If tests still fail with concurrency disabled it's most likely a logic bug
-#define DISABLE_CONCURRENT_WORKERS
-
-/*
- *  A thin wrapper over the asio::io_service with a few extras
- *    -manages it's own worker threads
- *    -some helpers for sharing with multiple modules that need to do async 
work
- */
-
-class IoServiceImpl : public IoService {
- public:
-  IoServiceImpl() {}
-
-  virtual unsigned int InitDefaultWorkers() override;
-  virtual unsigned int InitWorkers(unsigned int thread_count) override;
-  virtual void PostTask(std::function<void(void)>& asyncTask) override;
-  virtual void Run() override;
-  virtual void Stop() override { io_service_.stop(); }
-
-  // Add a single worker thread, in the common case try to avoid this in favor
-  // of Init[Default]Workers. Public for use by tests and rare cases where a
-  // client wants very explicit control of threading for performance reasons
-  // e.g. pinning threads to NUMA nodes.
-  bool AddWorkerThread();
-
-  // Be very careful about using this: HDFS-10241
-  ::asio::io_service &io_service() { return io_service_; }
-  unsigned int get_worker_thread_count();
- private:
-  std::mutex state_lock_;
-  ::asio::io_service io_service_;
-
-  // For doing logging + resource manager updates on thread start/exit
-  void ThreadStartHook();
-  void ThreadExitHook();
-
-  // Support for async worker threads
-  struct WorkerDeleter {
-    void operator()(std::thread *t);
-  };
-  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
-  std::vector<WorkerPtr> worker_threads_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc
new file mode 100644
index 0000000..de081ed
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ioservice_impl.h"
+
+#include <thread>
+#include <mutex>
+#include <vector>
+
+#include "common/util.h"
+#include "common/logging.h"
+
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+std::shared_ptr<IoService> IoService::MakeShared() {
+  return std::make_shared<IoServiceImpl>();
+}
+
+
+unsigned int IoServiceImpl::InitDefaultWorkers() {
+  LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << 
" called.");
+  unsigned int logical_thread_count = std::thread::hardware_concurrency();
+#ifndef DISABLE_CONCURRENT_WORKERS
+  if(logical_thread_count < 1) {
+    LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not 
detect any logical processors.  Defaulting to 1 worker thread.");
+  } else {
+    LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << 
logical_thread_count << " logical threads and will spawn a worker for each.");
+  }
+#else
+  if(logical_thread_count > 0) {
+    LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << 
logical_thread_count << " threads available.  Concurrent workers are disabled 
so 1 worker thread will be used");
+  }
+  logical_thread_count = 1;
+#endif
+  return InitWorkers(logical_thread_count);
+}
+
+unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) {
+#ifdef DISABLED_CONCURRENT_WORKERS
+  LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count 
<< " threads specified but concurrent workers are disabled so 1 will be used");
+  thread_count = 1;
+#endif
+  unsigned int created_threads = 0;
+  for(unsigned int i=0; i<thread_count; i++) {
+    bool created = AddWorkerThread();
+    if(created) {
+      created_threads++;
+    } else {
+      LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
failed to create a worker thread");
+    }
+  }
+  if(created_threads != thread_count) {
+    LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers 
attempted to create "
+                            << thread_count << " but only created " << 
created_threads
+                            << " worker threads.  Make sure this process has 
adequate resources.");
+  }
+  return created_threads;
+}
+
+bool IoServiceImpl::AddWorkerThread() {
+  mutex_guard state_lock(state_lock_);
+  auto async_worker = [this]() {
+    this->ThreadStartHook();
+    this->Run();
+    this->ThreadExitHook();
+  };
+  worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) );
+  return true;
+}
+
+
+void IoServiceImpl::ThreadStartHook() {
+  mutex_guard state_lock(state_lock_);
+  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " starting");
+}
+
+void IoServiceImpl::ThreadExitHook() {
+  mutex_guard state_lock(state_lock_);
+  LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() 
<< " for IoServiceImpl@" << this << " exiting");
+}
+
+void IoServiceImpl::PostTask(std::function<void(void)> asyncTask) {
+  io_service_.post(asyncTask);
+}
+
+void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) {
+  // It is far too easy to destroy the filesystem (and thus the threadpool)
+  //     from within one of the worker threads, leading to a deadlock.  Let's
+  //     provide some explicit protection.
+  if(t->get_id() == std::this_thread::get_id()) {
+    LOG_ERROR(kAsyncRuntime, << 
"FileSystemImpl::WorkerDeleter::operator(treadptr="
+                             << t << ") : FATAL: Attempted to destroy a thread 
pool"
+                             "from within a callback of the thread pool!");
+  }
+  t->join();
+  delete t;
+}
+
+// As long as this just forwards to an asio::io_service method it doesn't need 
a lock
+void IoServiceImpl::Run() {
+  // The IoService executes callbacks provided by library users in the context 
of worker threads,
+  // there is no way of preventing those callbacks from throwing but we can at 
least prevent them
+  // from escaping this library and crashing the process.
+
+  // As recommended in 
http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
+  asio::io_service::work work(io_service_);
+  while(true)
+  {
+    try
+    {
+      io_service_.run();
+      break;
+    } catch (const std::exception & e) {
+      LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker 
thread: " << e.what());
+    } catch (...) {
+      LOG_WARN(kFileSystem, << "Caught unexpected value not derived from 
std::exception in libhdfspp worker thread");
+    }
+  }
+}
+
+void IoServiceImpl::Stop() {
+  // Note: This doesn't wait for running operations to stop.
+  io_service_.stop();
+}
+
+asio::io_service& IoServiceImpl::GetRaw() {
+  return io_service_;
+}
+
+unsigned int IoServiceImpl::GetWorkerThreadCount() {
+  mutex_guard state_lock(state_lock_);
+  return worker_threads_.size();
+
+}
+
+
+} // namespace hdfs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h
new file mode 100644
index 0000000..a29985c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_HDFS_IOSERVICE_H_
+#define COMMON_HDFS_IOSERVICE_H_
+
+#include "hdfspp/ioservice.h"
+
+#include <asio/io_service.hpp>
+#include "common/new_delete.h"
+
+#include <mutex>
+#include <thread>
+
+namespace hdfs {
+
+/*
+ *  A thin wrapper over the asio::io_service with a few extras
+ *    -manages it's own worker threads
+ *    -some helpers for sharing with multiple modules that need to do async 
work
+ */
+
+class IoServiceImpl : public IoService {
+ public:
+  MEMCHECKED_CLASS(IoServiceImpl)
+  IoServiceImpl() {}
+
+  unsigned int InitDefaultWorkers() override;
+  unsigned int InitWorkers(unsigned int thread_count) override;
+  void PostTask(std::function<void(void)> asyncTask) override;
+  void Run() override;
+  void Stop() override;
+  asio::io_service& GetRaw() override;
+
+  // Add a single worker thread, in the common case try to avoid this in favor
+  // of Init[Default]Workers. Public for use by tests and rare cases where a
+  // client wants very explicit control of threading for performance reasons
+  // e.g. pinning threads to NUMA nodes.
+  bool AddWorkerThread() override;
+
+  unsigned int GetWorkerThreadCount() override;
+
+ private:
+  std::mutex state_lock_;
+  ::asio::io_service io_service_;
+
+  // For doing logging + resource manager updates on thread start/exit
+  void ThreadStartHook();
+  void ThreadExitHook();
+
+  // Support for async worker threads
+  struct WorkerDeleter {
+    void operator()(std::thread *t);
+  };
+  typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+  std::vector<WorkerPtr> worker_threads_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
index 69f9c6e..4e66a93 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h
@@ -23,14 +23,11 @@
 
 #include "hdfspp/log.h"
 
-#include <iostream>
 #include <sstream>
 #include <mutex>
 #include <memory>
 #include <thread>
 
-#include <asio/ip/tcp.hpp>
-
 namespace hdfs {
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
index d29f1e9..a04daf1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc
@@ -20,11 +20,12 @@
 
 #include "common/util.h"
 #include "common/logging.h"
+#include "hdfspp/ioservice.h"
 
 #include <sstream>
 #include <utility>
 #include <future>
-#include <memory>
+
 
 namespace hdfs {
 
@@ -35,8 +36,6 @@ ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const 
NamenodeInfo &info)
   return *this;
 }
 
-
-
 std::string ResolvedNamenodeInfo::str() const {
   std::stringstream ss;
   ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << 
name << ", uri: " << uri.str();
@@ -58,7 +57,7 @@ std::string ResolvedNamenodeInfo::str() const {
 }
 
 
-bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) 
{
+bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo 
&info) {
   // this isn't very memory friendly, but if it needs to be called often there 
are bigger issues at hand
   info.endpoints.clear();
   std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info});
@@ -76,7 +75,7 @@ typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
 // RAII wrapper
 class ScopedResolver {
  private:
-  ::asio::io_service *io_service_;
+  std::shared_ptr<IoService> io_service_;
   std::string host_;
   std::string port_;
   ::asio::ip::tcp::resolver::query query_;
@@ -86,8 +85,8 @@ class ScopedResolver {
   // Caller blocks on access if resolution isn't finished
   std::shared_ptr<std::promise<Status>> result_status_;
  public:
-  ScopedResolver(::asio::io_service *service, const std::string &host, const 
std::string &port) :
-        io_service_(service), host_(host), port_(port), query_(host, port), 
resolver_(*io_service_)
+  ScopedResolver(std::shared_ptr<IoService> service, const std::string &host, 
const std::string &port) :
+        io_service_(service), host_(host), port_(port), query_(host, port), 
resolver_(io_service_->GetRaw())
   {
     if(!io_service_)
       LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed 
nullptr to io_service");
@@ -140,7 +139,7 @@ class ScopedResolver {
   }
 };
 
-std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes) {
+std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> 
ioservice, const std::vector<NamenodeInfo> &nodes) {
   std::vector< std::unique_ptr<ScopedResolver> > resolvers;
   resolvers.reserve(nodes.size());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
index fdee8d7..f43690d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h
@@ -20,6 +20,7 @@
 #define COMMON_HDFS_NAMENODE_INFO_H_
 
 #include <asio.hpp>
+
 #include <hdfspp/options.h>
 
 #include <string>
@@ -27,6 +28,9 @@
 
 namespace hdfs {
 
+// Forward decl
+class IoService;
+
 // Internal representation of namenode info that keeps track
 // of its endpoints.
 struct ResolvedNamenodeInfo : public NamenodeInfo {
@@ -38,11 +42,11 @@ struct ResolvedNamenodeInfo : public NamenodeInfo {
 
 // Clear endpoints if set and resolve all of them in parallel.
 // Only successful lookups will be placed in the result set.
-std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, 
const std::vector<NamenodeInfo> &nodes);
+std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> 
ioservice, const std::vector<NamenodeInfo> &nodes);
 
 // Clear endpoints, if any, and resolve them again
 // Return true if endpoints were resolved
-bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info);
+bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo 
&info);
 
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
index 375f951..6a07987 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -19,17 +19,25 @@
 #include "common/util.h"
 #include "common/util_c.h"
 
+#include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
 
 #include <exception>
 #include <sstream>
-#include <iostream>
 #include <iomanip>
 #include <thread>
 
 
 namespace hdfs {
 
+Status ToStatus(const ::asio::error_code &ec) {
+  if (ec) {
+    return Status(ec.value(), ec.message().c_str());
+  } else {
+    return Status::OK();
+  }
+}
+
 bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
                             ::google::protobuf::MessageLite *msg) {
   uint32_t size = 0;
@@ -60,6 +68,10 @@ std::string SerializeDelimitedProtobufMessage(const 
::google::protobuf::MessageL
   return buf;
 }
 
+int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) {
+  size_t size = msg->ByteSize();
+  return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
+}
 
 std::string GetRandomClientName() {
   std::vector<unsigned char>buf(8);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
index be902bd..590ba54 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -21,16 +21,19 @@
 #include "hdfspp/status.h"
 #include "common/logging.h"
 
-#include <sstream>
 #include <mutex>
 #include <string>
 
 #include <asio/error_code.hpp>
 #include <openssl/rand.h>
-
-#include <google/protobuf/message_lite.h>
 #include <google/protobuf/io/coded_stream.h>
-#include <asio.hpp>
+
+
+namespace google {
+  namespace protobuf {
+    class MessageLite;
+  }
+}
 
 namespace hdfs {
 
@@ -38,20 +41,11 @@ namespace hdfs {
 typedef std::lock_guard<std::mutex> mutex_guard;
 
 
-static inline Status ToStatus(const ::asio::error_code &ec) {
-  if (ec) {
-    return Status(ec.value(), ec.message().c_str());
-  } else {
-    return Status::OK();
-  }
-}
+Status ToStatus(const ::asio::error_code &ec);
 
 // Determine size of buffer that needs to be allocated in order to serialize 
msg
 // in delimited format
-static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite 
*msg) {
-  size_t size = msg->ByteSize();
-  return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
-}
+int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
 
 // Construct msg from the input held in the CodedInputStream
 // return false on failure, otherwise return true
@@ -84,7 +78,6 @@ bool lock_held(T & mutex) {
 std::string SafeDisconnect(asio::ip::tcp::socket *sock);
 
 
-
 // The following helper function is used for classes that look like the 
following:
 //
 // template <typename socket_like_object>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
index 27cd666..4142482 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -24,14 +24,14 @@ namespace hdfs {
 DataNodeConnection::~DataNodeConnection(){}
 DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
 
-DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
-                                                const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
-                                                const 
hadoop::common::TokenProto *token,
-                                                LibhdfsEvents *event_handlers) 
: event_handlers_(event_handlers)
+DataNodeConnectionImpl::DataNodeConnectionImpl(std::shared_ptr<IoService> 
io_service,
+                                               const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+                                               const 
hadoop::common::TokenProto *token,
+                                               LibhdfsEvents *event_handlers) 
: event_handlers_(event_handlers)
 {
   using namespace ::asio::ip;
 
-  conn_.reset(new tcp::socket(*io_service));
+  conn_.reset(new tcp::socket(io_service->GetRaw()));
   auto datanode_addr = dn_proto.id();
   endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
                                   datanode_addr.xferport());
@@ -68,5 +68,22 @@ void DataNodeConnectionImpl::Cancel() {
   }
 }
 
+void DataNodeConnectionImpl::async_read_some(const MutableBuffer &buf,
+             std::function<void (const asio::error_code & error, std::size_t 
bytes_transferred) > handler)
+{
+  event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
+
+  mutex_guard state_lock(state_lock_);
+  conn_->async_read_some(buf, handler);
+}
+
+void DataNodeConnectionImpl::async_write_some(const ConstBuffer &buf,
+             std::function<void (const asio::error_code & error, std::size_t 
bytes_transferred) > handler)
+{
+  event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
+
+  mutex_guard state_lock(state_lock_);
+  conn_->async_write_some(buf, handler);
+}
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
index 21193b3..a54338f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -18,7 +18,7 @@
 #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
 #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
 
-#include "common/hdfs_ioservice.h"
+#include "hdfspp/ioservice.h"
 #include "common/async_stream.h"
 #include "ClientNamenodeProtocol.pb.h"
 #include "common/libhdfs_events_impl.h"
@@ -58,13 +58,14 @@ private:
   // held (briefly) while posting async ops to the asio task queue
   std::mutex state_lock_;
 public:
+  MEMCHECKED_CLASS(DataNodeConnectionImpl)
   std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
   std::array<asio::ip::tcp::endpoint, 1> endpoints_;
   std::string uuid_;
   LibhdfsEvents *event_handlers_;
 
   virtual ~DataNodeConnectionImpl();
-  DataNodeConnectionImpl(asio::io_service * io_service, const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+  DataNodeConnectionImpl(std::shared_ptr<IoService> io_service, const 
::hadoop::hdfs::DatanodeInfoProto &dn_proto,
                           const hadoop::common::TokenProto *token,
                           LibhdfsEvents *event_handlers);
 
@@ -72,24 +73,11 @@ public:
 
   void Cancel() override;
 
-  void async_read_some(const MutableBuffers &buf,
-                         std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler)
-                       override {
-    event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
+  void async_read_some(const MutableBuffer &buf,
+                       std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler) override;
 
-
-    mutex_guard state_lock(state_lock_);
-    conn_->async_read_some(buf, handler);
-  };
-
-  void async_write_some(const ConstBuffers &buf,
-                          std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler)
-                        override {
-    event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
-
-    mutex_guard state_lock(state_lock_);
-    conn_->async_write_some(buf, handler);
-  }
+  void async_write_some(const ConstBuffer &buf,
+                        std::function<void (const asio::error_code & error, 
std::size_t bytes_transferred) > handler) override;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
index ba702b0..02630fb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -36,10 +36,10 @@ FileHandle::~FileHandle() {}
 
 FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
                                const std::string & path,
-                               ::asio::io_service *io_service, const 
std::string &client_name,
-                                 const std::shared_ptr<const struct FileInfo> 
file_info,
-                                 std::shared_ptr<BadDataNodeTracker> 
bad_data_nodes,
-                                 std::shared_ptr<LibhdfsEvents> event_handlers)
+                               std::shared_ptr<IoService> io_service, const 
std::string &client_name,
+                               const std::shared_ptr<const struct FileInfo> 
file_info,
+                               std::shared_ptr<BadDataNodeTracker> 
bad_data_nodes,
+                               std::shared_ptr<LibhdfsEvents> event_handlers)
     : cluster_name_(cluster_name), path_(path), io_service_(io_service), 
client_name_(client_name), file_info_(file_info),
       bad_node_tracker_(bad_data_nodes), offset_(0), 
cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), 
bytes_read_(0) {
   LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
@@ -167,7 +167,7 @@ bool FileHandleImpl::CheckSeekBounds(ssize_t 
desired_position) {
  * on the FileHandle
  */
 void FileHandleImpl::AsyncPreadSome(
-    size_t offset, const MutableBuffers &buffers,
+    size_t offset, const MutableBuffer &buffer,
     std::shared_ptr<NodeExclusionRule> excluded_nodes,
     const std::function<void(const Status &, const std::string &, size_t)> 
handler) {
   using ::hadoop::hdfs::DatanodeInfoProto;
@@ -233,7 +233,7 @@ void FileHandleImpl::AsyncPreadSome(
 
   uint64_t offset_within_block = offset - block->offset();
   uint64_t size_within_block = std::min<uint64_t>(
-      block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+      block->b().numbytes() - offset_within_block, asio::buffer_size(buffer));
 
   LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
             << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << 
", IP Address=" << dnIpAddr
@@ -268,7 +268,7 @@ void FileHandleImpl::AsyncPreadSome(
     handler(status, dn_id, transferred);
   };
 
-  auto connect_handler = 
[handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block,
 buffers, reader, dn_id, client_name]
+  auto connect_handler = 
[handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block,
 buffer, reader, dn_id, client_name]
           (Status status, std::shared_ptr<DataNodeConnection> dn) {
     (void)dn;
     event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, 
cluster_name.c_str(), path.c_str(), 0);
@@ -281,7 +281,7 @@ void FileHandleImpl::AsyncPreadSome(
     if (status.ok()) {
       reader->AsyncReadBlock(
           client_name, *block, offset_within_block,
-          asio::buffer(buffers, size_within_block), read_handler);
+          asio::buffer(buffer, size_within_block), read_handler);
     } else {
       handler(status, dn_id, 0);
     }
@@ -307,7 +307,7 @@ std::shared_ptr<BlockReader> 
FileHandleImpl::CreateBlockReader(const BlockReader
 }
 
 std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
-    ::asio::io_service * io_service,
+    std::shared_ptr<IoService> io_service,
     const ::hadoop::hdfs::DatanodeInfoProto & dn,
     const hadoop::common::TokenProto * token) {
   LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
index 4135156..57da237 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -18,7 +18,7 @@
 #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
 #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
 
-#include "common/hdfs_ioservice.h"
+#include "hdfspp/ioservice.h"
 #include "common/async_stream.h"
 #include "common/cancel_tracker.h"
 #include "common/libhdfs_events_impl.h"
@@ -26,12 +26,10 @@
 #include "reader/fileinfo.h"
 #include "reader/readergroup.h"
 
-#include "asio.hpp"
 #include "bad_datanode_tracker.h"
 #include "ClientNamenodeProtocol.pb.h"
 
 #include <mutex>
-#include <iostream>
 
 namespace hdfs {
 
@@ -53,7 +51,7 @@ public:
   MEMCHECKED_CLASS(FileHandleImpl)
   FileHandleImpl(const std::string & cluster_name,
                  const std::string & path,
-                 ::asio::io_service *io_service, const std::string 
&client_name,
+                 std::shared_ptr<IoService> io_service, const std::string 
&client_name,
                   const std::shared_ptr<const struct FileInfo> file_info,
                   std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
                   std::shared_ptr<LibhdfsEvents> event_handlers);
@@ -93,7 +91,7 @@ public:
    * If trying to begin a read past the EOF, status will be 
Status::InvalidOffset.
    *
    */
-  void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,
+  void AsyncPreadSome(size_t offset, const MutableBuffer &buffer,
                       std::shared_ptr<NodeExclusionRule> excluded_nodes,
                       const std::function<void(const Status &status,
                       const std::string &dn_id, size_t bytes_read)> handler);
@@ -124,13 +122,13 @@ protected:
                                                          
std::shared_ptr<DataNodeConnection> dn,
                                                          
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
   virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
-      ::asio::io_service *io_service,
+      std::shared_ptr<IoService> io_service,
       const ::hadoop::hdfs::DatanodeInfoProto & dn,
       const hadoop::common::TokenProto * token);
 private:
   const std::string cluster_name_;
   const std::string path_;
-  ::asio::io_service * const io_service_;
+  std::shared_ptr<IoService> io_service_;
   const std::string client_name_;
   const std::shared_ptr<const struct FileInfo> file_info_;
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 56d02d8..41cc645 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -18,6 +18,7 @@
 
 #include "filesystem.h"
 
+#include "filehandle.h"
 #include "common/namenode_info.h"
 
 #include <functional>
@@ -104,6 +105,54 @@ FileSystem *FileSystem::New() {
  *                    FILESYSTEM IMPLEMENTATION
  ****************************************************************************/
 
+struct FileSystemImpl::FindSharedState {
+  //Name pattern (can have wild-cards) to find
+  const std::string name;
+  //Maximum depth to recurse after the end of path is reached.
+  //Can be set to 0 for pure path globbing and ignoring name pattern entirely.
+  const uint32_t maxdepth;
+  //Vector of all sub-directories from the path argument (each can have 
wild-cards)
+  std::vector<std::string> dirs;
+  //Callback from Find
+  const std::function<bool(const Status &, const std::vector<StatInfo> &, 
bool)> handler;
+  //outstanding_requests is incremented once for every GetListing call.
+  std::atomic<uint64_t> outstanding_requests;
+  //Boolean needed to abort all recursion on error or on user command
+  std::atomic<bool> aborted;
+  //Shared variables will need protection with a lock
+  std::mutex lock;
+  FindSharedState(const std::string path_, const std::string name_, const 
uint32_t maxdepth_,
+              const std::function<bool(const Status &, const 
std::vector<StatInfo> &, bool)> handler_,
+              uint64_t outstanding_recuests_, bool aborted_)
+      : name(name_),
+        maxdepth(maxdepth_),
+        handler(handler_),
+        outstanding_requests(outstanding_recuests_),
+        aborted(aborted_),
+        lock() {
+    //Constructing the list of sub-directories
+    std::stringstream ss(path_);
+    if(path_.back() != '/'){
+      ss << "/";
+    }
+    for (std::string token; std::getline(ss, token, '/'); ) {
+      dirs.push_back(token);
+    }
+  }
+};
+
+struct FileSystemImpl::FindOperationalState {
+  const std::string path;
+  const uint32_t depth;
+  const bool search_path;
+  FindOperationalState(const std::string path_, const uint32_t depth_, const 
bool search_path_)
+      : path(path_),
+        depth(depth_),
+        search_path(search_path_) {
+  }
+};
+
+
 const std::string get_effective_user_name(const std::string &user_name) {
   if (!user_name.empty())
     return user_name;
@@ -134,10 +183,10 @@ const std::string get_effective_user_name(const 
std::string &user_name) {
 }
 
 FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string 
&user_name, const Options &options) :
-     io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options),
+     io_service_(io_service), options_(options),
      client_name_(GetRandomClientName()),
      nn_(
-       &io_service_->io_service(), options, client_name_,
+       io_service_, options, client_name_,
        get_effective_user_name(user_name), kNamenodeProtocol,
        kNamenodeProtocolVersion
      ),
@@ -166,10 +215,10 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, 
const std::string &user_n
 }
 
 FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const 
std::string& user_name, const Options &options) :
-     io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), 
options_(options),
+     io_service_(io_service), options_(options),
      client_name_(GetRandomClientName()),
      nn_(
-       &io_service_->io_service(), options, client_name_,
+       io_service_, options, client_name_,
        get_effective_user_name(user_name), kNamenodeProtocol,
        kNamenodeProtocolVersion
      ),
@@ -178,7 +227,7 @@ FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> 
io_service, const std:
 {
   LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
                          << FMT_THIS_ADDR << ", shared IoService@" << 
io_service_.get() << ") called");
-  int worker_thread_count = io_service_->get_worker_thread_count();
+  int worker_thread_count = io_service_->GetWorkerThreadCount();
   if(worker_thread_count < 1) {
     LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService 
provided doesn't have any worker threads. "
                           << "It needs at least 1 worker to connect to an HDFS 
cluster.")
@@ -217,7 +266,7 @@ void FileSystemImpl::Connect(const std::string &server,
   auto name_service = options_.services.find(server);
   if(name_service != options_.services.end()) {
     cluster_name_ = name_service->first;
-    resolved_namenodes = BulkResolve(&io_service_->io_service(), 
name_service->second);
+    resolved_namenodes = BulkResolve(io_service_, name_service->second);
   } else {
     cluster_name_ = server + ":" + service;
 
@@ -230,7 +279,7 @@ void FileSystemImpl::Connect(const std::string &server,
       handler(Status::Error(("Invalid namenode " + cluster_name_ + " in 
config").c_str()), this);
     }
 
-    resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info});
+    resolved_namenodes = BulkResolve(io_service_, {tmp_info});
   }
 
   for(unsigned int i=0;i<resolved_namenodes.size();i++) {
@@ -282,7 +331,7 @@ int FileSystemImpl::WorkerThreadCount() {
   if(!io_service_) {
     return -1;
   } else {
-    return io_service_->get_worker_thread_count();
+    return io_service_->GetWorkerThreadCount();
   }
 }
 
@@ -339,7 +388,7 @@ void FileSystemImpl::Open(
         LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode");
       }
     }
-    handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, 
&io_service_->io_service(), client_name_, file_info, bad_node_tracker_, 
event_handlers_)
+    handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, 
io_service_, client_name_, file_info, bad_node_tracker_, event_handlers_)
                             : nullptr);
   });
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index f2e9abd..935e7c9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -18,19 +18,18 @@
 #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
 
-#include "filehandle.h"
-#include "hdfspp/hdfspp.h"
+#include "namenode_operations.h"
 #include "fs/bad_datanode_tracker.h"
-#include "reader/block_reader.h"
+#include "hdfspp/hdfspp.h"
 #include "reader/fileinfo.h"
 
-#include "asio.hpp"
-
 #include <thread>
-#include "namenode_operations.h"
 
 namespace hdfs {
 
+class FileHandle;
+
+
 /*
  * FileSystem: The consumer's main point of interaction with the cluster as
  * a whole.
@@ -48,6 +47,7 @@ public:
   MEMCHECKED_CLASS(FileSystemImpl)
   typedef std::function<void(const Status &, FileSystem *)> ConnectCallback;
 
+  // Note: Longer term it'd be cleaner to take a rvalue reference to a 
shared_ptr to get ownership
   explicit FileSystemImpl(IoService *&io_service, const std::string& 
user_name, const Options &options);
   explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& 
user_name, const Options &options);
   ~FileSystemImpl() override;
@@ -215,7 +215,7 @@ private:
    *  A side effect of this is that requests may outlive the RpcEngine they
    *  reference.
    **/
-  std::shared_ptr<IoServiceImpl> io_service_;
+  std::shared_ptr<IoService> io_service_;
   const Options options_;
   const std::string client_name_;
   std::string cluster_name_;
@@ -234,53 +234,11 @@ private:
 
   void GetListingShim(const Status &stat, const std::vector<StatInfo> & 
stat_infos, bool has_more,
               std::string path, const std::function<bool(const Status &, const 
std::vector<StatInfo> &, bool)> &handler);
-
-  struct FindSharedState {
-    //Name pattern (can have wild-cards) to find
-    const std::string name;
-    //Maximum depth to recurse after the end of path is reached.
-    //Can be set to 0 for pure path globbing and ignoring name pattern 
entirely.
-    const uint32_t maxdepth;
-    //Vector of all sub-directories from the path argument (each can have 
wild-cards)
-    std::vector<std::string> dirs;
-    //Callback from Find
-    const std::function<bool(const Status &, const std::vector<StatInfo> &, 
bool)> handler;
-    //outstanding_requests is incremented once for every GetListing call.
-    std::atomic<uint64_t> outstanding_requests;
-    //Boolean needed to abort all recursion on error or on user command
-    std::atomic<bool> aborted;
-    //Shared variables will need protection with a lock
-    std::mutex lock;
-    FindSharedState(const std::string path_, const std::string name_, const 
uint32_t maxdepth_,
-                const std::function<bool(const Status &, const 
std::vector<StatInfo> &, bool)> handler_,
-                uint64_t outstanding_recuests_, bool aborted_)
-        : name(name_),
-          maxdepth(maxdepth_),
-          handler(handler_),
-          outstanding_requests(outstanding_recuests_),
-          aborted(aborted_),
-          lock() {
-      //Constructing the list of sub-directories
-      std::stringstream ss(path_);
-      if(path_.back() != '/'){
-        ss << "/";
-      }
-      for (std::string token; std::getline(ss, token, '/'); ) {
-        dirs.push_back(token);
-      }
-    }
-  };
-
-  struct FindOperationalState {
-    const std::string path;
-    const uint32_t depth;
-    const bool search_path;
-    FindOperationalState(const std::string path_, const uint32_t depth_, const 
bool search_path_)
-        : path(path_),
-          depth(depth_),
-          search_path(search_path_) {
-    }
-  };
+  /**
+   * Helper struct to store state for recursive find
+   */
+  struct FindSharedState;
+  struct FindOperationalState;
 
   void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
                 bool directory_has_more, std::shared_ptr<FindOperationalState> 
current_state, std::shared_ptr<FindSharedState> shared_state);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
index f4caa18..3470a48 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h
@@ -42,7 +42,7 @@ namespace hdfs {
 class NameNodeOperations {
 public:
   MEMCHECKED_CLASS(NameNodeOperations)
-  NameNodeOperations(::asio::io_service *io_service, const Options &options,
+  NameNodeOperations(std::shared_ptr<IoService> io_service, const Options 
&options,
             const std::string &client_name, const std::string &user_name,
             const char *protocol_name, int protocol_version) :
   io_service_(io_service),
@@ -119,7 +119,7 @@ private:
   static void 
DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> 
stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
   static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const 
std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs);
 
-  ::asio::io_service * io_service_;
+  std::shared_ptr<IoService> io_service_;
 
   // This is the only permanent owner of the RpcEngine, however the RPC layer
   // needs to reference count it prevent races during FileSystem destruction.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
index ca7715d..90c02f7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -431,7 +431,7 @@ private:
   std::shared_ptr<DataNodeConnection> shared_conn_;
 };
 
-void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
+void BlockReaderImpl::AsyncReadPacket(const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t bytes_transferred)> 
&handler)
 {
   assert(state_ != kOpen && "Not connected");
@@ -450,7 +450,7 @@ void BlockReaderImpl::AsyncReadPacket(const MutableBuffers 
&buffers,
       .Push(new ReadChecksum(this))
       .Push(new ReadPadding(this))
       .Push(new ReadData(
-          this, m->state().bytes_transferred, buffers))
+          this, m->state().bytes_transferred, buffer))
       .Push(new AckRead(this));
 
   auto self = this->shared_from_this();
@@ -460,14 +460,14 @@ void BlockReaderImpl::AsyncReadPacket(const 
MutableBuffers &buffers,
 }
 
 
-size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status 
*status)
+size_t BlockReaderImpl::ReadPacket(const MutableBuffer &buffer, Status *status)
 {
   LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
 
   size_t transferred = 0;
   auto done = std::make_shared<std::promise<void>>();
   auto future = done->get_future();
-  AsyncReadPacket(buffers,
+  AsyncReadPacket(buffer,
                   [status, &transferred, done](const Status &stat, size_t t) {
                     *status = stat;
                     transferred = t;
@@ -504,7 +504,7 @@ private:
 
 struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
 {
-  ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t 
*transferred)
+  ReadBlockContinuation(BlockReader *reader, MutableBuffer buffer, size_t 
*transferred)
       : reader_(reader), buffer_(buffer), 
buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
 
   virtual void Run(const Next &next) override {
@@ -517,7 +517,7 @@ struct BlockReaderImpl::ReadBlockContinuation : 
continuation::Continuation
 
 private:
   BlockReader *reader_;
-  const MutableBuffers buffer_;
+  const MutableBuffer buffer_;
   const size_t buffer_size_;
   size_t *transferred_;
   std::function<void(const Status &)> next_;
@@ -542,7 +542,7 @@ void BlockReaderImpl::AsyncReadBlock(
     const std::string & client_name,
     const hadoop::hdfs::LocatedBlockProto &block,
     size_t offset,
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t)> handler)
 {
   LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
@@ -551,10 +551,10 @@ void BlockReaderImpl::AsyncReadBlock(
   auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
   size_t * bytesTransferred = &m->state();
 
-  size_t size = asio::buffer_size(buffers);
+  size_t size = asio::buffer_size(buffer);
 
   m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, 
offset))
-    .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
+    .Push(new ReadBlockContinuation(this, buffer, bytesTransferred));
 
   m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
     handler(status, totalBytesTransferred);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index b5cbdf5..167c57d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -72,11 +72,11 @@ public:
   virtual void AsyncReadBlock(
     const std::string & client_name,
     const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t)> handler) = 0;
 
   virtual void AsyncReadPacket(
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t bytes_transferred)> 
&handler) = 0;
 
   virtual void AsyncRequestBlock(
@@ -98,7 +98,7 @@ public:
         chunk_padding_bytes_(0), cancel_state_(cancel_state), 
event_handlers_(event_handlers.get()) {}
 
   virtual void AsyncReadPacket(
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t bytes_transferred)> 
&handler) override;
 
   virtual void AsyncRequestBlock(
@@ -111,12 +111,12 @@ public:
   virtual void AsyncReadBlock(
     const std::string & client_name,
     const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
-    const MutableBuffers &buffers,
+    const MutableBuffer &buffer,
     const std::function<void(const Status &, size_t)> handler) override;
 
   virtual void CancelOperation() override;
 
-  size_t ReadPacket(const MutableBuffers &buffers, Status *status);
+  size_t ReadPacket(const MutableBuffer &buffer, Status *status);
 
   Status RequestBlock(
     const std::string &client_name,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
index 93103c5..2b36f59 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -44,13 +44,13 @@ public:
 
   template <class Handler> void Handshake(const Handler &next);
 
-  void async_read_some(const MutableBuffers &buf,
+  void async_read_some(const MutableBuffer &buf,
           std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
     stream_->async_read_some(buf, handler);
   }
 
-  void async_write_some(const ConstBuffers &buf,
+  void async_write_some(const ConstBuffer &buf,
             std::function<void (const asio::error_code & error,
                                  std::size_t bytes_transferred) > handler) 
override {
     stream_->async_write_some(buf, handler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
index e83a28c..242c6ea 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc
@@ -35,7 +35,7 @@ static std::string format_endpoints(const 
std::vector<::asio::ip::tcp::endpoint>
 }
 
 HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> 
&servers,
-                                     ::asio::io_service *ioservice,
+                                     std::shared_ptr<IoService> ioservice,
                                      std::shared_ptr<LibhdfsEvents> 
event_handlers)
                   : enabled_(false), resolved_(false),
                     ioservice_(ioservice), event_handlers_(event_handlers)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
index cc34f51..032b1d9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h
@@ -40,7 +40,7 @@ namespace hdfs {
 class HANamenodeTracker {
  public:
   HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
-                    ::asio::io_service *ioservice,
+                    std::shared_ptr<IoService> ioservice,
                     std::shared_ptr<LibhdfsEvents> event_handlers_);
 
   virtual ~HANamenodeTracker();
@@ -66,7 +66,7 @@ class HANamenodeTracker {
   bool resolved_;
 
   // Keep service in case a second round of DNS lookup is required
-  ::asio::io_service *ioservice_;
+  std::shared_ptr<IoService> ioservice_;
 
   // Event handlers, for now this is the simplest place to catch all failover 
events
   // and push info out to client application.  Possibly move into RPCEngine.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
index 356411e..9157476 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc
@@ -20,6 +20,7 @@
 #include "request.h"
 #include "rpc_engine.h"
 #include "sasl_protocol.h"
+#include "hdfspp/ioservice.h"
 
 #include "RpcHeader.pb.h"
 #include "ProtobufRpcEngine.pb.h"
@@ -118,7 +119,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, 
const std::string &m
     : engine_(engine),
       method_name_(method_name),
       call_id_(call_id),
-      timer_(engine->io_service()),
+      timer_(engine->io_service()->GetRaw()),
       handler_(std::move(handler)),
       retry_count_(engine->retry_policy() ? 0 : kNoRetry),
       failover_count_(0)
@@ -129,7 +130,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, 
const std::string &m
 Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler)
     : engine_(engine),
       call_id_(-1/*Handshake ID*/),
-      timer_(engine->io_service()),
+      timer_(engine->io_service()->GetRaw()),
       handler_(std::move(handler)),
       retry_count_(engine->retry_policy() ? 0 : kNoRetry),
       failover_count_(0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
index 9e54983..9f7b3bb 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h
@@ -83,7 +83,7 @@ class RpcConnection : public 
std::enable_shared_from_this<RpcConnection> {
   void SetAuthInfo(const AuthInfo& auth_info);
 
   std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; }
-  ::asio::io_service *GetIoService();
+  std::shared_ptr<IoService> GetIoService();
 
  protected:
   struct Response {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
index ee9b704..43111ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc
@@ -70,27 +70,27 @@ 
RpcConnection::RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine)
     : engine_(engine),
       connected_(kNotYetConnected) {}
 
-::asio::io_service *RpcConnection::GetIoService() {
+std::shared_ptr<IoService> RpcConnection::GetIoService() {
   std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
   if(!pinnedEngine) {
     LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid RpcEngine");
     return nullptr;
   }
 
-  return &pinnedEngine->io_service();
+  return pinnedEngine->io_service();
 }
 
 void RpcConnection::StartReading() {
   auto shared_this = shared_from_this();
-  ::asio::io_service *service = GetIoService();
+  std::shared_ptr<IoService> service = GetIoService();
   if(!service) {
     LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
     return;
   }
 
-  service->post([shared_this, this] () {
-    OnRecvCompleted(::asio::error_code(), 0);
-  });
+  service->PostLambda(
+    [shared_this, this] () { OnRecvCompleted(::asio::error_code(), 0); }
+  );
 }
 
 void RpcConnection::HandshakeComplete(const Status &s) {
@@ -164,13 +164,14 @@ void RpcConnection::ContextComplete(const Status &s) {
 void RpcConnection::AsyncFlushPendingRequests() {
   std::shared_ptr<RpcConnection> shared_this = shared_from_this();
 
-  ::asio::io_service *service = GetIoService();
+  std::shared_ptr<IoService> service = GetIoService();
   if(!service) {
     LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
     return;
   }
 
-  service->post([shared_this, this]() {
+  std::function<void()> task = [shared_this, this]()
+  {
     std::lock_guard<std::mutex> state_lock(connection_state_lock_);
 
     LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called 
(connected=" << ToString(connected_) << ")");
@@ -178,7 +179,10 @@ void RpcConnection::AsyncFlushPendingRequests() {
     if (!outgoing_request_) {
       FlushPendingRequests();
     }
-  });
+  };
+
+  service->PostTask(task);
+
 }
 
 Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
@@ -228,15 +232,17 @@ Status 
RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
     return status;
   }
 
-  ::asio::io_service *service = GetIoService();
+  std::shared_ptr<IoService> service = GetIoService();
   if(!service) {
     LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access 
invalid IoService");
     return Status::Error("RpcConnection attempted to access invalid 
IoService");
   }
 
-  service->post([req, response, status]() {
-    req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
-  });
+  service->PostLambda(
+    [req, response, status]() {
+      req->OnResponseArrived(response->in.get(), status);  // Never call back 
while holding a lock
+    }
+  );
 
   return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to