HDFS-13403: libhdfs++ Use hdfs::IoService object rather than asio::io_service. Contributed by James Clampffer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eefe2a14 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eefe2a14 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eefe2a14 Branch: refs/heads/HDFS-7240 Commit: eefe2a147c83dbb62c4021b67d59d3b9f065f890 Parents: 7eb783e Author: James Clampffer <j...@apache.org> Authored: Wed Apr 11 10:27:23 2018 -0400 Committer: James Clampffer <j...@apache.org> Committed: Wed Apr 11 10:27:23 2018 -0400 ---------------------------------------------------------------------- .../native/libhdfspp/include/hdfspp/hdfspp.h | 53 +------ .../native/libhdfspp/include/hdfspp/ioservice.h | 140 ++++++++++++++++ .../native/libhdfspp/lib/bindings/c/hdfs.cc | 7 +- .../native/libhdfspp/lib/common/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/common/async_stream.h | 13 +- .../libhdfspp/lib/common/continuation/asio.h | 5 - .../libhdfspp/lib/common/hdfs_ioservice.cc | 146 ----------------- .../libhdfspp/lib/common/hdfs_ioservice.h | 79 --------- .../libhdfspp/lib/common/ioservice_impl.cc | 159 +++++++++++++++++++ .../libhdfspp/lib/common/ioservice_impl.h | 76 +++++++++ .../main/native/libhdfspp/lib/common/logging.h | 3 - .../libhdfspp/lib/common/namenode_info.cc | 15 +- .../native/libhdfspp/lib/common/namenode_info.h | 8 +- .../main/native/libhdfspp/lib/common/util.cc | 14 +- .../src/main/native/libhdfspp/lib/common/util.h | 25 ++- .../lib/connection/datanodeconnection.cc | 27 +++- .../lib/connection/datanodeconnection.h | 26 +-- .../main/native/libhdfspp/lib/fs/filehandle.cc | 18 +-- .../main/native/libhdfspp/lib/fs/filehandle.h | 12 +- .../main/native/libhdfspp/lib/fs/filesystem.cc | 67 ++++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 66 ++------ .../libhdfspp/lib/fs/namenode_operations.h | 4 +- .../native/libhdfspp/lib/reader/block_reader.cc | 18 +-- .../native/libhdfspp/lib/reader/block_reader.h | 10 +- .../native/libhdfspp/lib/reader/datatransfer.h | 4 +- .../libhdfspp/lib/rpc/namenode_tracker.cc | 2 +- .../native/libhdfspp/lib/rpc/namenode_tracker.h | 4 +- .../main/native/libhdfspp/lib/rpc/request.cc | 5 +- .../native/libhdfspp/lib/rpc/rpc_connection.h | 2 +- .../libhdfspp/lib/rpc/rpc_connection_impl.cc | 32 ++-- .../libhdfspp/lib/rpc/rpc_connection_impl.h | 9 +- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 14 +- .../main/native/libhdfspp/lib/rpc/rpc_engine.h | 9 +- .../native/libhdfspp/tests/bad_datanode_test.cc | 31 ++-- .../libhdfspp/tests/hdfs_ioservice_test.cc | 10 +- .../native/libhdfspp/tests/mock_connection.h | 4 +- .../libhdfspp/tests/remote_block_reader_test.cc | 4 +- .../native/libhdfspp/tests/rpc_engine_test.cc | 112 ++++++------- 38 files changed, 681 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 2fdeec9..e68a612 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -19,6 +19,7 @@ #define LIBHDFSPP_HDFSPP_H_ #include "hdfspp/options.h" +#include "hdfspp/ioservice.h" #include "hdfspp/status.h" #include "hdfspp/events.h" #include "hdfspp/block_location.h" @@ -31,62 +32,10 @@ #include <functional> #include <memory> -#include <set> -#include <iostream> namespace hdfs { /** - * An IoService manages a queue of asynchronous tasks. All libhdfs++ - * operations are filed against a particular IoService. - * - * When an operation is queued into an IoService, the IoService will - * run the callback handler associated with the operation. Note that - * the IoService must be stopped before destructing the objects that - * post the operations. - * - * From an implementation point of view the hdfs::IoService provides - * a thin wrapper over an asio::io_service object so that additional - * instrumentation and functionality can be added. - **/ - -class IoService : public std::enable_shared_from_this<IoService> -{ - public: - static IoService *New(); - static std::shared_ptr<IoService> MakeShared(); - virtual ~IoService(); - - /** - * Start up as many threads as there are logical processors. - * Return number of threads created. - **/ - virtual unsigned int InitDefaultWorkers() = 0; - - /** - * Initialize with thread_count handler threads. - * If thread count is less than one print a log message and default to one thread. - * Return number of threads created. - **/ - virtual unsigned int InitWorkers(unsigned int thread_count) = 0; - - /** - * Place an item on the execution queue. Will be invoked from outside of the calling context. - **/ - virtual void PostTask(std::function<void(void)>& asyncTask) = 0; - - /** - * Run the asynchronous tasks associated with this IoService. - **/ - virtual void Run() = 0; - /** - * Stop running asynchronous tasks associated with this IoService. - * All worker threads will return as soon as they finish executing their current task. - **/ - virtual void Stop() = 0; -}; - -/** * A node exclusion rule provides a simple way of testing if the * client should attempt to connect to a node based on the node's * UUID. The FileSystem and FileHandle use the BadDataNodeTracker http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h new file mode 100644 index 0000000..9805bad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/ioservice.h @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An asio::io_service maintains a queue of asynchronous tasks and invokes them + * when they are ready to run. Async network IO handlers become runnable when + * the associated IO operation has completed. The hdfs::IoService is a thin + * wrapper over that object to make it easier to add logging and instrumentation + * to tasks that have been queued. + * + * Lifecycle management: + * -The IoService *shall* outlive any tasks it owns. Deleting a task + * before it has been run **will** result in dangling reference issues. + * -Dependencies (including transitive dependencies) of pending tasks + * *shall* outlive the task. Failure to ensure this **will** result in + * danging reference issues. + * -libhdfs++ uses shared_ptr/weak_ptr heavily as a mechanism to ensure + * liveness of dependencies. + * -refcounted pointers in lambda capture lists have a poor track record + * for ensuring liveness in this library; it's easy to omit them because + * the capture list isn't context aware. Developers are encouraged to + * write callable classes that explicitly list dependencies. + * + * Constraints on tasks: + * -Tasks and async callbacks *shall* never do blocking IO or sleep(). + * At best this hurts performance by preventing worker threads from doing + * useful work. It may also cause situations that look like deadlocks + * if the worker thread is stalled for long enough. + * -Tasks and async callbacks *shall* not acquire locks that guard resources + * that might be unavailable for an unknown amount of time. Lock acquisition + * when accessing shared data structures is acceptable and is often required. + * -Tasks and async callbacks *should* not allow exceptions to escape their + * scope since tasks will be executed on a different stack then where they + * were created. The exception will be caught by the IoService rather than + * being forwarded to the next task. + * -Tasks and async callbacks *should* not rely on thread local storage for + * ancillary context. The IoService does not support any sort of thread + * affinity that would guarantee tasks Post()ed from one thread will always + * be executed on the same thread. Applications that only use a single + * worker thread may use TLS but developers should be mindful that throughput + * can no longer be scaled by adding threads. + **/ +#ifndef INCLUDE_HDFSPP_IOSERVICE_H_ +#define INCLUDE_HDFSPP_IOSERVICE_H_ + +#include <memory> + +// forward decl +namespace asio { + class io_service; +} + +namespace hdfs { + +// (Un)comment this to determine if issues are due to concurrency or logic faults +// If tests still fail with concurrency disabled it's most likely a logic bug +#define DISABLE_CONCURRENT_WORKERS + +class IoService : public std::enable_shared_from_this<IoService> +{ + public: + static IoService *New(); + static std::shared_ptr<IoService> MakeShared(); + virtual ~IoService(); + + /** + * Start up as many threads as there are logical processors. + * Return number of threads created. + **/ + virtual unsigned int InitDefaultWorkers() = 0; + + /** + * Initialize with thread_count handler threads. + * If thread count is less than one print a log message and default to one thread. + * Return number of threads created. + **/ + virtual unsigned int InitWorkers(unsigned int thread_count) = 0; + + /** + * Add a worker thread to existing pool. + * Return true on success, false otherwise. + **/ + virtual bool AddWorkerThread() = 0; + + /** + * Return the number of worker threads in use. + **/ + virtual unsigned int GetWorkerThreadCount() = 0; + + /** + * Enqueue an item for deferred execution. Non-blocking. + * Task will be invoked from outside of the calling context. + **/ + virtual void PostTask(std::function<void(void)> asyncTask) = 0; + + /** + * Provide type erasure for lambdas defined inside the argument list. + **/ + template <typename LambdaInstance> + inline void PostLambda(LambdaInstance&& func) + { + std::function<void(void)> typeEraser = func; + this->PostTask(func); + } + + /** + * Run the asynchronous tasks associated with this IoService. + **/ + virtual void Run() = 0; + /** + * Stop running asynchronous tasks associated with this IoService. + * All worker threads will return as soon as they finish executing their current task. + **/ + virtual void Stop() = 0; + + /** + * Access underlying io_service object. Only to be used in asio library calls. + * After HDFS-11884 is complete only tests should need direct access to the asio::io_service. + **/ + virtual asio::io_service& GetRaw() = 0; +}; + + +} // namespace hdfs +#endif // include guard http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 9a7c8b4..6b2468f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -17,18 +17,17 @@ */ #include "hdfspp/hdfspp.h" +#include "hdfspp/hdfs_ext.h" -#include "fs/filesystem.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" #include "common/logging.h" +#include "fs/filesystem.h" +#include "fs/filehandle.h" -#include <hdfs/hdfs.h> -#include <hdfspp/hdfs_ext.h> #include <libgen.h> #include "limits.h" - #include <string> #include <cstring> #include <iostream> http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 5d9e52c..1ab04d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc config_parser.cc) +add_library(common_obj OBJECT status.cc sasl_digest_md5.cc ioservice_impl.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc config_parser.cc) add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>) target_link_libraries(common ${LIB_DL}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h index 575904c..efe2e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h @@ -19,12 +19,15 @@ #ifndef LIB_COMMON_ASYNC_STREAM_H_ #define LIB_COMMON_ASYNC_STREAM_H_ -#include <asio.hpp> +#include <asio/buffer.hpp> +#include <asio/error_code.hpp> +#include <functional> namespace hdfs { -typedef asio::mutable_buffers_1 MutableBuffers; -typedef asio::const_buffers_1 ConstBuffers; +// Contiguous buffer types +typedef asio::mutable_buffers_1 MutableBuffer; +typedef asio::const_buffers_1 ConstBuffer; /* * asio-compatible stream implementation. @@ -35,11 +38,11 @@ typedef asio::const_buffers_1 ConstBuffers; */ class AsyncStream { public: - virtual void async_read_some(const MutableBuffers &buf, + virtual void async_read_some(const MutableBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) = 0; - virtual void async_write_some(const ConstBuffers &buf, + virtual void async_write_some(const ConstBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) = 0; }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index 193358f..0215176 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -20,13 +20,8 @@ #include "continuation.h" #include "common/util.h" - #include "hdfspp/status.h" - -#include <asio/connect.hpp> -#include <asio/read.hpp> #include <asio/write.hpp> -#include <asio/ip/tcp.hpp> #include <memory> namespace hdfs { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc deleted file mode 100644 index 578b782..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.cc +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs_ioservice.h" - -#include <thread> -#include <mutex> -#include <vector> - -#include "common/logging.h" - -namespace hdfs { - -IoService::~IoService() {} - -IoService *IoService::New() { - return new IoServiceImpl(); -} - -std::shared_ptr<IoService> IoService::MakeShared() { - return std::make_shared<IoServiceImpl>(); -} - - -unsigned int IoServiceImpl::InitDefaultWorkers() { - LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called."); - unsigned int logical_thread_count = std::thread::hardware_concurrency(); -#ifndef DISABLE_CONCURRENT_WORKERS - if(logical_thread_count < 1) { - LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread."); - } else { - LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each."); - } -#else - if(logical_thread_count > 0) { - LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used"); - } - logical_thread_count = 1; -#endif - return InitWorkers(logical_thread_count); -} - -unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) { -#ifdef DISABLED_CONCURRENT_WORKERS - LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used"); - thread_count = 1; -#endif - unsigned int created_threads = 0; - for(unsigned int i=0; i<thread_count; i++) { - bool created = AddWorkerThread(); - if(created) { - created_threads++; - } else { - LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread"); - } - } - if(created_threads != thread_count) { - LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create " - << thread_count << " but only created " << created_threads - << " worker threads. Make sure this process has adequate resources."); - } - return created_threads; -} - -bool IoServiceImpl::AddWorkerThread() { - mutex_guard state_lock(state_lock_); - auto async_worker = [this]() { - this->ThreadStartHook(); - this->Run(); - this->ThreadExitHook(); - }; - worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) ); - return true; -} - - -void IoServiceImpl::ThreadStartHook() { - mutex_guard state_lock(state_lock_); - LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting"); -} - -void IoServiceImpl::ThreadExitHook() { - mutex_guard state_lock(state_lock_); - LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting"); -} - -void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) { - io_service_.post(asyncTask); -} - -void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) { - // It is far too easy to destroy the filesystem (and thus the threadpool) - // from within one of the worker threads, leading to a deadlock. Let's - // provide some explicit protection. - if(t->get_id() == std::this_thread::get_id()) { - LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" - << t << ") : FATAL: Attempted to destroy a thread pool" - "from within a callback of the thread pool!"); - } - t->join(); - delete t; -} - -// As long as this just forwards to an asio::io_service method it doesn't need a lock -void IoServiceImpl::Run() { - // The IoService executes callbacks provided by library users in the context of worker threads, - // there is no way of preventing those callbacks from throwing but we can at least prevent them - // from escaping this library and crashing the process. - - // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers - asio::io_service::work work(io_service_); - while(true) - { - try - { - io_service_.run(); - break; - } catch (const std::exception & e) { - LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); - } catch (...) { - LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); - } - } -} - -unsigned int IoServiceImpl::get_worker_thread_count() { - mutex_guard state_lock(state_lock_); - return worker_threads_.size(); -} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h deleted file mode 100644 index 294252b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_ioservice.h +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef COMMON_HDFS_IOSERVICE_H_ -#define COMMON_HDFS_IOSERVICE_H_ - -#include "hdfspp/hdfspp.h" - -#include <asio/io_service.hpp> -#include "common/util.h" - -#include <mutex> -#include <thread> - -namespace hdfs { - -// Uncomment this to determine if issues are due to concurrency or logic faults -// If tests still fail with concurrency disabled it's most likely a logic bug -#define DISABLE_CONCURRENT_WORKERS - -/* - * A thin wrapper over the asio::io_service with a few extras - * -manages it's own worker threads - * -some helpers for sharing with multiple modules that need to do async work - */ - -class IoServiceImpl : public IoService { - public: - IoServiceImpl() {} - - virtual unsigned int InitDefaultWorkers() override; - virtual unsigned int InitWorkers(unsigned int thread_count) override; - virtual void PostTask(std::function<void(void)>& asyncTask) override; - virtual void Run() override; - virtual void Stop() override { io_service_.stop(); } - - // Add a single worker thread, in the common case try to avoid this in favor - // of Init[Default]Workers. Public for use by tests and rare cases where a - // client wants very explicit control of threading for performance reasons - // e.g. pinning threads to NUMA nodes. - bool AddWorkerThread(); - - // Be very careful about using this: HDFS-10241 - ::asio::io_service &io_service() { return io_service_; } - unsigned int get_worker_thread_count(); - private: - std::mutex state_lock_; - ::asio::io_service io_service_; - - // For doing logging + resource manager updates on thread start/exit - void ThreadStartHook(); - void ThreadExitHook(); - - // Support for async worker threads - struct WorkerDeleter { - void operator()(std::thread *t); - }; - typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; - std::vector<WorkerPtr> worker_threads_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc new file mode 100644 index 0000000..de081ed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.cc @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ioservice_impl.h" + +#include <thread> +#include <mutex> +#include <vector> + +#include "common/util.h" +#include "common/logging.h" + + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { + return new IoServiceImpl(); +} + +std::shared_ptr<IoService> IoService::MakeShared() { + return std::make_shared<IoServiceImpl>(); +} + + +unsigned int IoServiceImpl::InitDefaultWorkers() { + LOG_TRACE(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers@" << this << " called."); + unsigned int logical_thread_count = std::thread::hardware_concurrency(); +#ifndef DISABLE_CONCURRENT_WORKERS + if(logical_thread_count < 1) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers did not detect any logical processors. Defaulting to 1 worker thread."); + } else { + LOG_DEBUG(kRPC, << "IoServiceImpl::InitDefaultWorkers detected " << logical_thread_count << " logical threads and will spawn a worker for each."); + } +#else + if(logical_thread_count > 0) { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitDefaultWorkers: " << logical_thread_count << " threads available. Concurrent workers are disabled so 1 worker thread will be used"); + } + logical_thread_count = 1; +#endif + return InitWorkers(logical_thread_count); +} + +unsigned int IoServiceImpl::InitWorkers(unsigned int thread_count) { +#ifdef DISABLED_CONCURRENT_WORKERS + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl::InitWorkers: " << thread_count << " threads specified but concurrent workers are disabled so 1 will be used"); + thread_count = 1; +#endif + unsigned int created_threads = 0; + for(unsigned int i=0; i<thread_count; i++) { + bool created = AddWorkerThread(); + if(created) { + created_threads++; + } else { + LOG_DEBUG(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers failed to create a worker thread"); + } + } + if(created_threads != thread_count) { + LOG_WARN(kAsyncRuntime, << "IoServiceImpl@" << this << " ::InitWorkers attempted to create " + << thread_count << " but only created " << created_threads + << " worker threads. Make sure this process has adequate resources."); + } + return created_threads; +} + +bool IoServiceImpl::AddWorkerThread() { + mutex_guard state_lock(state_lock_); + auto async_worker = [this]() { + this->ThreadStartHook(); + this->Run(); + this->ThreadExitHook(); + }; + worker_threads_.push_back(WorkerPtr( new std::thread(async_worker)) ); + return true; +} + + +void IoServiceImpl::ThreadStartHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " starting"); +} + +void IoServiceImpl::ThreadExitHook() { + mutex_guard state_lock(state_lock_); + LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting"); +} + +void IoServiceImpl::PostTask(std::function<void(void)> asyncTask) { + io_service_.post(asyncTask); +} + +void IoServiceImpl::WorkerDeleter::operator()(std::thread *t) { + // It is far too easy to destroy the filesystem (and thus the threadpool) + // from within one of the worker threads, leading to a deadlock. Let's + // provide some explicit protection. + if(t->get_id() == std::this_thread::get_id()) { + LOG_ERROR(kAsyncRuntime, << "FileSystemImpl::WorkerDeleter::operator(treadptr=" + << t << ") : FATAL: Attempted to destroy a thread pool" + "from within a callback of the thread pool!"); + } + t->join(); + delete t; +} + +// As long as this just forwards to an asio::io_service method it doesn't need a lock +void IoServiceImpl::Run() { + // The IoService executes callbacks provided by library users in the context of worker threads, + // there is no way of preventing those callbacks from throwing but we can at least prevent them + // from escaping this library and crashing the process. + + // As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers + asio::io_service::work work(io_service_); + while(true) + { + try + { + io_service_.run(); + break; + } catch (const std::exception & e) { + LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); + } catch (...) { + LOG_WARN(kFileSystem, << "Caught unexpected value not derived from std::exception in libhdfspp worker thread"); + } + } +} + +void IoServiceImpl::Stop() { + // Note: This doesn't wait for running operations to stop. + io_service_.stop(); +} + +asio::io_service& IoServiceImpl::GetRaw() { + return io_service_; +} + +unsigned int IoServiceImpl::GetWorkerThreadCount() { + mutex_guard state_lock(state_lock_); + return worker_threads_.size(); + +} + + +} // namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h new file mode 100644 index 0000000..a29985c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/ioservice_impl.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_IOSERVICE_H_ +#define COMMON_HDFS_IOSERVICE_H_ + +#include "hdfspp/ioservice.h" + +#include <asio/io_service.hpp> +#include "common/new_delete.h" + +#include <mutex> +#include <thread> + +namespace hdfs { + +/* + * A thin wrapper over the asio::io_service with a few extras + * -manages it's own worker threads + * -some helpers for sharing with multiple modules that need to do async work + */ + +class IoServiceImpl : public IoService { + public: + MEMCHECKED_CLASS(IoServiceImpl) + IoServiceImpl() {} + + unsigned int InitDefaultWorkers() override; + unsigned int InitWorkers(unsigned int thread_count) override; + void PostTask(std::function<void(void)> asyncTask) override; + void Run() override; + void Stop() override; + asio::io_service& GetRaw() override; + + // Add a single worker thread, in the common case try to avoid this in favor + // of Init[Default]Workers. Public for use by tests and rare cases where a + // client wants very explicit control of threading for performance reasons + // e.g. pinning threads to NUMA nodes. + bool AddWorkerThread() override; + + unsigned int GetWorkerThreadCount() override; + + private: + std::mutex state_lock_; + ::asio::io_service io_service_; + + // For doing logging + resource manager updates on thread start/exit + void ThreadStartHook(); + void ThreadExitHook(); + + // Support for async worker threads + struct WorkerDeleter { + void operator()(std::thread *t); + }; + typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; + std::vector<WorkerPtr> worker_threads_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h index 69f9c6e..4e66a93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -23,14 +23,11 @@ #include "hdfspp/log.h" -#include <iostream> #include <sstream> #include <mutex> #include <memory> #include <thread> -#include <asio/ip/tcp.hpp> - namespace hdfs { /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc index d29f1e9..a04daf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.cc @@ -20,11 +20,12 @@ #include "common/util.h" #include "common/logging.h" +#include "hdfspp/ioservice.h" #include <sstream> #include <utility> #include <future> -#include <memory> + namespace hdfs { @@ -35,8 +36,6 @@ ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo &info) return *this; } - - std::string ResolvedNamenodeInfo::str() const { std::stringstream ss; ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << name << ", uri: " << uri.str(); @@ -58,7 +57,7 @@ std::string ResolvedNamenodeInfo::str() const { } -bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) { +bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo &info) { // this isn't very memory friendly, but if it needs to be called often there are bigger issues at hand info.endpoints.clear(); std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info}); @@ -76,7 +75,7 @@ typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector; // RAII wrapper class ScopedResolver { private: - ::asio::io_service *io_service_; + std::shared_ptr<IoService> io_service_; std::string host_; std::string port_; ::asio::ip::tcp::resolver::query query_; @@ -86,8 +85,8 @@ class ScopedResolver { // Caller blocks on access if resolution isn't finished std::shared_ptr<std::promise<Status>> result_status_; public: - ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) : - io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_) + ScopedResolver(std::shared_ptr<IoService> service, const std::string &host, const std::string &port) : + io_service_(service), host_(host), port_(port), query_(host, port), resolver_(io_service_->GetRaw()) { if(!io_service_) LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service"); @@ -140,7 +139,7 @@ class ScopedResolver { } }; -std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { +std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> ioservice, const std::vector<NamenodeInfo> &nodes) { std::vector< std::unique_ptr<ScopedResolver> > resolvers; resolvers.reserve(nodes.size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h index fdee8d7..f43690d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/namenode_info.h @@ -20,6 +20,7 @@ #define COMMON_HDFS_NAMENODE_INFO_H_ #include <asio.hpp> + #include <hdfspp/options.h> #include <string> @@ -27,6 +28,9 @@ namespace hdfs { +// Forward decl +class IoService; + // Internal representation of namenode info that keeps track // of its endpoints. struct ResolvedNamenodeInfo : public NamenodeInfo { @@ -38,11 +42,11 @@ struct ResolvedNamenodeInfo : public NamenodeInfo { // Clear endpoints if set and resolve all of them in parallel. // Only successful lookups will be placed in the result set. -std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes); +std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> ioservice, const std::vector<NamenodeInfo> &nodes); // Clear endpoints, if any, and resolve them again // Return true if endpoints were resolved -bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info); +bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo &info); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index 375f951..6a07987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -19,17 +19,25 @@ #include "common/util.h" #include "common/util_c.h" +#include <google/protobuf/message_lite.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> #include <exception> #include <sstream> -#include <iostream> #include <iomanip> #include <thread> namespace hdfs { +Status ToStatus(const ::asio::error_code &ec) { + if (ec) { + return Status(ec.value(), ec.message().c_str()); + } else { + return Status::OK(); + } +} + bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, ::google::protobuf::MessageLite *msg) { uint32_t size = 0; @@ -60,6 +68,10 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL return buf; } +int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { + size_t size = msg->ByteSize(); + return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; +} std::string GetRandomClientName() { std::vector<unsigned char>buf(8); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index be902bd..590ba54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -21,16 +21,19 @@ #include "hdfspp/status.h" #include "common/logging.h" -#include <sstream> #include <mutex> #include <string> #include <asio/error_code.hpp> #include <openssl/rand.h> - -#include <google/protobuf/message_lite.h> #include <google/protobuf/io/coded_stream.h> -#include <asio.hpp> + + +namespace google { + namespace protobuf { + class MessageLite; + } +} namespace hdfs { @@ -38,20 +41,11 @@ namespace hdfs { typedef std::lock_guard<std::mutex> mutex_guard; -static inline Status ToStatus(const ::asio::error_code &ec) { - if (ec) { - return Status(ec.value(), ec.message().c_str()); - } else { - return Status::OK(); - } -} +Status ToStatus(const ::asio::error_code &ec); // Determine size of buffer that needs to be allocated in order to serialize msg // in delimited format -static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { - size_t size = msg->ByteSize(); - return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; -} +int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg); // Construct msg from the input held in the CodedInputStream // return false on failure, otherwise return true @@ -84,7 +78,6 @@ bool lock_held(T & mutex) { std::string SafeDisconnect(asio::ip::tcp::socket *sock); - // The following helper function is used for classes that look like the following: // // template <typename socket_like_object> http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc index 27cd666..4142482 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -24,14 +24,14 @@ namespace hdfs { DataNodeConnection::~DataNodeConnection(){} DataNodeConnectionImpl::~DataNodeConnectionImpl(){} -DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, - const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, - const hadoop::common::TokenProto *token, - LibhdfsEvents *event_handlers) : event_handlers_(event_handlers) +DataNodeConnectionImpl::DataNodeConnectionImpl(std::shared_ptr<IoService> io_service, + const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token, + LibhdfsEvents *event_handlers) : event_handlers_(event_handlers) { using namespace ::asio::ip; - conn_.reset(new tcp::socket(*io_service)); + conn_.reset(new tcp::socket(io_service->GetRaw())); auto datanode_addr = dn_proto.id(); endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()), datanode_addr.xferport()); @@ -68,5 +68,22 @@ void DataNodeConnectionImpl::Cancel() { } } +void DataNodeConnectionImpl::async_read_some(const MutableBuffer &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) +{ + event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin()); + + mutex_guard state_lock(state_lock_); + conn_->async_read_some(buf, handler); +} + +void DataNodeConnectionImpl::async_write_some(const ConstBuffer &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) +{ + event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin()); + + mutex_guard state_lock(state_lock_); + conn_->async_write_some(buf, handler); +} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h index 21193b3..a54338f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -18,7 +18,7 @@ #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ -#include "common/hdfs_ioservice.h" +#include "hdfspp/ioservice.h" #include "common/async_stream.h" #include "ClientNamenodeProtocol.pb.h" #include "common/libhdfs_events_impl.h" @@ -58,13 +58,14 @@ private: // held (briefly) while posting async ops to the asio task queue std::mutex state_lock_; public: + MEMCHECKED_CLASS(DataNodeConnectionImpl) std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_; std::array<asio::ip::tcp::endpoint, 1> endpoints_; std::string uuid_; LibhdfsEvents *event_handlers_; virtual ~DataNodeConnectionImpl(); - DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + DataNodeConnectionImpl(std::shared_ptr<IoService> io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, const hadoop::common::TokenProto *token, LibhdfsEvents *event_handlers); @@ -72,24 +73,11 @@ public: void Cancel() override; - void async_read_some(const MutableBuffers &buf, - std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) - override { - event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin()); + void async_read_some(const MutableBuffer &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override; - - mutex_guard state_lock(state_lock_); - conn_->async_read_some(buf, handler); - }; - - void async_write_some(const ConstBuffers &buf, - std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) - override { - event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin()); - - mutex_guard state_lock(state_lock_); - conn_->async_write_some(buf, handler); - } + void async_write_some(const ConstBuffer &buf, + std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index ba702b0..02630fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -36,10 +36,10 @@ FileHandle::~FileHandle() {} FileHandleImpl::FileHandleImpl(const std::string & cluster_name, const std::string & path, - ::asio::io_service *io_service, const std::string &client_name, - const std::shared_ptr<const struct FileInfo> file_info, - std::shared_ptr<BadDataNodeTracker> bad_data_nodes, - std::shared_ptr<LibhdfsEvents> event_handlers) + std::shared_ptr<IoService> io_service, const std::string &client_name, + const std::shared_ptr<const struct FileInfo> file_info, + std::shared_ptr<BadDataNodeTracker> bad_data_nodes, + std::shared_ptr<LibhdfsEvents> event_handlers) : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) { LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" @@ -167,7 +167,7 @@ bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) { * on the FileHandle */ void FileHandleImpl::AsyncPreadSome( - size_t offset, const MutableBuffers &buffers, + size_t offset, const MutableBuffer &buffer, std::shared_ptr<NodeExclusionRule> excluded_nodes, const std::function<void(const Status &, const std::string &, size_t)> handler) { using ::hadoop::hdfs::DatanodeInfoProto; @@ -233,7 +233,7 @@ void FileHandleImpl::AsyncPreadSome( uint64_t offset_within_block = offset - block->offset(); uint64_t size_within_block = std::min<uint64_t>( - block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + block->b().numbytes() - offset_within_block, asio::buffer_size(buffer)); LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr @@ -268,7 +268,7 @@ void FileHandleImpl::AsyncPreadSome( handler(status, dn_id, transferred); }; - auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] + auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffer, reader, dn_id, client_name] (Status status, std::shared_ptr<DataNodeConnection> dn) { (void)dn; event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); @@ -281,7 +281,7 @@ void FileHandleImpl::AsyncPreadSome( if (status.ok()) { reader->AsyncReadBlock( client_name, *block, offset_within_block, - asio::buffer(buffers, size_within_block), read_handler); + asio::buffer(buffer, size_within_block), read_handler); } else { handler(status, dn_id, 0); } @@ -307,7 +307,7 @@ std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReader } std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( - ::asio::io_service * io_service, + std::shared_ptr<IoService> io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, const hadoop::common::TokenProto * token) { LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 4135156..57da237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -18,7 +18,7 @@ #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ -#include "common/hdfs_ioservice.h" +#include "hdfspp/ioservice.h" #include "common/async_stream.h" #include "common/cancel_tracker.h" #include "common/libhdfs_events_impl.h" @@ -26,12 +26,10 @@ #include "reader/fileinfo.h" #include "reader/readergroup.h" -#include "asio.hpp" #include "bad_datanode_tracker.h" #include "ClientNamenodeProtocol.pb.h" #include <mutex> -#include <iostream> namespace hdfs { @@ -53,7 +51,7 @@ public: MEMCHECKED_CLASS(FileHandleImpl) FileHandleImpl(const std::string & cluster_name, const std::string & path, - ::asio::io_service *io_service, const std::string &client_name, + std::shared_ptr<IoService> io_service, const std::string &client_name, const std::shared_ptr<const struct FileInfo> file_info, std::shared_ptr<BadDataNodeTracker> bad_data_nodes, std::shared_ptr<LibhdfsEvents> event_handlers); @@ -93,7 +91,7 @@ public: * If trying to begin a read past the EOF, status will be Status::InvalidOffset. * */ - void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, + void AsyncPreadSome(size_t offset, const MutableBuffer &buffer, std::shared_ptr<NodeExclusionRule> excluded_nodes, const std::function<void(const Status &status, const std::string &dn_id, size_t bytes_read)> handler); @@ -124,13 +122,13 @@ protected: std::shared_ptr<DataNodeConnection> dn, std::shared_ptr<hdfs::LibhdfsEvents> event_handlers); virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( - ::asio::io_service *io_service, + std::shared_ptr<IoService> io_service, const ::hadoop::hdfs::DatanodeInfoProto & dn, const hadoop::common::TokenProto * token); private: const std::string cluster_name_; const std::string path_; - ::asio::io_service * const io_service_; + std::shared_ptr<IoService> io_service_; const std::string client_name_; const std::shared_ptr<const struct FileInfo> file_info_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 56d02d8..41cc645 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -18,6 +18,7 @@ #include "filesystem.h" +#include "filehandle.h" #include "common/namenode_info.h" #include <functional> @@ -104,6 +105,54 @@ FileSystem *FileSystem::New() { * FILESYSTEM IMPLEMENTATION ****************************************************************************/ +struct FileSystemImpl::FindSharedState { + //Name pattern (can have wild-cards) to find + const std::string name; + //Maximum depth to recurse after the end of path is reached. + //Can be set to 0 for pure path globbing and ignoring name pattern entirely. + const uint32_t maxdepth; + //Vector of all sub-directories from the path argument (each can have wild-cards) + std::vector<std::string> dirs; + //Callback from Find + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler; + //outstanding_requests is incremented once for every GetListing call. + std::atomic<uint64_t> outstanding_requests; + //Boolean needed to abort all recursion on error or on user command + std::atomic<bool> aborted; + //Shared variables will need protection with a lock + std::mutex lock; + FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_, + uint64_t outstanding_recuests_, bool aborted_) + : name(name_), + maxdepth(maxdepth_), + handler(handler_), + outstanding_requests(outstanding_recuests_), + aborted(aborted_), + lock() { + //Constructing the list of sub-directories + std::stringstream ss(path_); + if(path_.back() != '/'){ + ss << "/"; + } + for (std::string token; std::getline(ss, token, '/'); ) { + dirs.push_back(token); + } + } +}; + +struct FileSystemImpl::FindOperationalState { + const std::string path; + const uint32_t depth; + const bool search_path; + FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) + : path(path_), + depth(depth_), + search_path(search_path_) { + } +}; + + const std::string get_effective_user_name(const std::string &user_name) { if (!user_name.empty()) return user_name; @@ -134,10 +183,10 @@ const std::string get_effective_user_name(const std::string &user_name) { } FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) : - io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options), + io_service_(io_service), options_(options), client_name_(GetRandomClientName()), nn_( - &io_service_->io_service(), options, client_name_, + io_service_, options, client_name_, get_effective_user_name(user_name), kNamenodeProtocol, kNamenodeProtocolVersion ), @@ -166,10 +215,10 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n } FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) : - io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), options_(options), + io_service_(io_service), options_(options), client_name_(GetRandomClientName()), nn_( - &io_service_->io_service(), options, client_name_, + io_service_, options, client_name_, get_effective_user_name(user_name), kNamenodeProtocol, kNamenodeProtocolVersion ), @@ -178,7 +227,7 @@ FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std: { LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called"); - int worker_thread_count = io_service_->get_worker_thread_count(); + int worker_thread_count = io_service_->GetWorkerThreadCount(); if(worker_thread_count < 1) { LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. " << "It needs at least 1 worker to connect to an HDFS cluster.") @@ -217,7 +266,7 @@ void FileSystemImpl::Connect(const std::string &server, auto name_service = options_.services.find(server); if(name_service != options_.services.end()) { cluster_name_ = name_service->first; - resolved_namenodes = BulkResolve(&io_service_->io_service(), name_service->second); + resolved_namenodes = BulkResolve(io_service_, name_service->second); } else { cluster_name_ = server + ":" + service; @@ -230,7 +279,7 @@ void FileSystemImpl::Connect(const std::string &server, handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this); } - resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info}); + resolved_namenodes = BulkResolve(io_service_, {tmp_info}); } for(unsigned int i=0;i<resolved_namenodes.size();i++) { @@ -282,7 +331,7 @@ int FileSystemImpl::WorkerThreadCount() { if(!io_service_) { return -1; } else { - return io_service_->get_worker_thread_count(); + return io_service_->GetWorkerThreadCount(); } } @@ -339,7 +388,7 @@ void FileSystemImpl::Open( LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode"); } } - handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) + handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, io_service_, client_name_, file_info, bad_node_tracker_, event_handlers_) : nullptr); }); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index f2e9abd..935e7c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -18,19 +18,18 @@ #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ -#include "filehandle.h" -#include "hdfspp/hdfspp.h" +#include "namenode_operations.h" #include "fs/bad_datanode_tracker.h" -#include "reader/block_reader.h" +#include "hdfspp/hdfspp.h" #include "reader/fileinfo.h" -#include "asio.hpp" - #include <thread> -#include "namenode_operations.h" namespace hdfs { +class FileHandle; + + /* * FileSystem: The consumer's main point of interaction with the cluster as * a whole. @@ -48,6 +47,7 @@ public: MEMCHECKED_CLASS(FileSystemImpl) typedef std::function<void(const Status &, FileSystem *)> ConnectCallback; + // Note: Longer term it'd be cleaner to take a rvalue reference to a shared_ptr to get ownership explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); ~FileSystemImpl() override; @@ -215,7 +215,7 @@ private: * A side effect of this is that requests may outlive the RpcEngine they * reference. **/ - std::shared_ptr<IoServiceImpl> io_service_; + std::shared_ptr<IoService> io_service_; const Options options_; const std::string client_name_; std::string cluster_name_; @@ -234,53 +234,11 @@ private: void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler); - - struct FindSharedState { - //Name pattern (can have wild-cards) to find - const std::string name; - //Maximum depth to recurse after the end of path is reached. - //Can be set to 0 for pure path globbing and ignoring name pattern entirely. - const uint32_t maxdepth; - //Vector of all sub-directories from the path argument (each can have wild-cards) - std::vector<std::string> dirs; - //Callback from Find - const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler; - //outstanding_requests is incremented once for every GetListing call. - std::atomic<uint64_t> outstanding_requests; - //Boolean needed to abort all recursion on error or on user command - std::atomic<bool> aborted; - //Shared variables will need protection with a lock - std::mutex lock; - FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, - const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_, - uint64_t outstanding_recuests_, bool aborted_) - : name(name_), - maxdepth(maxdepth_), - handler(handler_), - outstanding_requests(outstanding_recuests_), - aborted(aborted_), - lock() { - //Constructing the list of sub-directories - std::stringstream ss(path_); - if(path_.back() != '/'){ - ss << "/"; - } - for (std::string token; std::getline(ss, token, '/'); ) { - dirs.push_back(token); - } - } - }; - - struct FindOperationalState { - const std::string path; - const uint32_t depth; - const bool search_path; - FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) - : path(path_), - depth(depth_), - search_path(search_path_) { - } - }; + /** + * Helper struct to store state for recursive find + */ + struct FindSharedState; + struct FindOperationalState; void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index f4caa18..3470a48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -42,7 +42,7 @@ namespace hdfs { class NameNodeOperations { public: MEMCHECKED_CLASS(NameNodeOperations) - NameNodeOperations(::asio::io_service *io_service, const Options &options, + NameNodeOperations(std::shared_ptr<IoService> io_service, const Options &options, const std::string &client_name, const std::string &user_name, const char *protocol_name, int protocol_version) : io_service_(io_service), @@ -119,7 +119,7 @@ private: static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs); - ::asio::io_service * io_service_; + std::shared_ptr<IoService> io_service_; // This is the only permanent owner of the RpcEngine, however the RPC layer // needs to reference count it prevent races during FileSystem destruction. http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index ca7715d..90c02f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -431,7 +431,7 @@ private: std::shared_ptr<DataNodeConnection> shared_conn_; }; -void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, +void BlockReaderImpl::AsyncReadPacket(const MutableBuffer &buffer, const std::function<void(const Status &, size_t bytes_transferred)> &handler) { assert(state_ != kOpen && "Not connected"); @@ -450,7 +450,7 @@ void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, .Push(new ReadChecksum(this)) .Push(new ReadPadding(this)) .Push(new ReadData( - this, m->state().bytes_transferred, buffers)) + this, m->state().bytes_transferred, buffer)) .Push(new AckRead(this)); auto self = this->shared_from_this(); @@ -460,14 +460,14 @@ void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, } -size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) +size_t BlockReaderImpl::ReadPacket(const MutableBuffer &buffer, Status *status) { LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); size_t transferred = 0; auto done = std::make_shared<std::promise<void>>(); auto future = done->get_future(); - AsyncReadPacket(buffers, + AsyncReadPacket(buffer, [status, &transferred, done](const Status &stat, size_t t) { *status = stat; transferred = t; @@ -504,7 +504,7 @@ private: struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { - ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred) + ReadBlockContinuation(BlockReader *reader, MutableBuffer buffer, size_t *transferred) : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} virtual void Run(const Next &next) override { @@ -517,7 +517,7 @@ struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation private: BlockReader *reader_; - const MutableBuffers buffer_; + const MutableBuffer buffer_; const size_t buffer_size_; size_t *transferred_; std::function<void(const Status &)> next_; @@ -542,7 +542,7 @@ void BlockReaderImpl::AsyncReadBlock( const std::string & client_name, const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t)> handler) { LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" @@ -551,10 +551,10 @@ void BlockReaderImpl::AsyncReadBlock( auto m = continuation::Pipeline<size_t>::Create(cancel_state_); size_t * bytesTransferred = &m->state(); - size_t size = asio::buffer_size(buffers); + size_t size = asio::buffer_size(buffer); m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset)) - .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); + .Push(new ReadBlockContinuation(this, buffer, bytesTransferred)); m->Run([handler] (const Status &status, const size_t totalBytesTransferred) { handler(status, totalBytesTransferred); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index b5cbdf5..167c57d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -72,11 +72,11 @@ public: virtual void AsyncReadBlock( const std::string & client_name, const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t)> handler) = 0; virtual void AsyncReadPacket( - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0; virtual void AsyncRequestBlock( @@ -98,7 +98,7 @@ public: chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} virtual void AsyncReadPacket( - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t bytes_transferred)> &handler) override; virtual void AsyncRequestBlock( @@ -111,12 +111,12 @@ public: virtual void AsyncReadBlock( const std::string & client_name, const hadoop::hdfs::LocatedBlockProto &block, size_t offset, - const MutableBuffers &buffers, + const MutableBuffer &buffer, const std::function<void(const Status &, size_t)> handler) override; virtual void CancelOperation() override; - size_t ReadPacket(const MutableBuffers &buffers, Status *status); + size_t ReadPacket(const MutableBuffer &buffer, Status *status); Status RequestBlock( const std::string &client_name, http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h index 93103c5..2b36f59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -44,13 +44,13 @@ public: template <class Handler> void Handshake(const Handler &next); - void async_read_some(const MutableBuffers &buf, + void async_read_some(const MutableBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { stream_->async_read_some(buf, handler); } - void async_write_some(const ConstBuffers &buf, + void async_write_some(const ConstBuffer &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { stream_->async_write_some(buf, handler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc index e83a28c..242c6ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.cc @@ -35,7 +35,7 @@ static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> } HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, + std::shared_ptr<IoService> ioservice, std::shared_ptr<LibhdfsEvents> event_handlers) : enabled_(false), resolved_(false), ioservice_(ioservice), event_handlers_(event_handlers) http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h index cc34f51..032b1d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/namenode_tracker.h @@ -40,7 +40,7 @@ namespace hdfs { class HANamenodeTracker { public: HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, + std::shared_ptr<IoService> ioservice, std::shared_ptr<LibhdfsEvents> event_handlers_); virtual ~HANamenodeTracker(); @@ -66,7 +66,7 @@ class HANamenodeTracker { bool resolved_; // Keep service in case a second round of DNS lookup is required - ::asio::io_service *ioservice_; + std::shared_ptr<IoService> ioservice_; // Event handlers, for now this is the simplest place to catch all failover events // and push info out to client application. Possibly move into RPCEngine. http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc index 356411e..9157476 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/request.cc @@ -20,6 +20,7 @@ #include "request.h" #include "rpc_engine.h" #include "sasl_protocol.h" +#include "hdfspp/ioservice.h" #include "RpcHeader.pb.h" #include "ProtobufRpcEngine.pb.h" @@ -118,7 +119,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &m : engine_(engine), method_name_(method_name), call_id_(call_id), - timer_(engine->io_service()), + timer_(engine->io_service()->GetRaw()), handler_(std::move(handler)), retry_count_(engine->retry_policy() ? 0 : kNoRetry), failover_count_(0) @@ -129,7 +130,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &m Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler) : engine_(engine), call_id_(-1/*Handshake ID*/), - timer_(engine->io_service()), + timer_(engine->io_service()->GetRaw()), handler_(std::move(handler)), retry_count_(engine->retry_policy() ? 0 : kNoRetry), failover_count_(0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 9e54983..9f7b3bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -83,7 +83,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> { void SetAuthInfo(const AuthInfo& auth_info); std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; } - ::asio::io_service *GetIoService(); + std::shared_ptr<IoService> GetIoService(); protected: struct Response { http://git-wip-us.apache.org/repos/asf/hadoop/blob/eefe2a14/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc index ee9b704..43111ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc @@ -70,27 +70,27 @@ RpcConnection::RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine) : engine_(engine), connected_(kNotYetConnected) {} -::asio::io_service *RpcConnection::GetIoService() { +std::shared_ptr<IoService> RpcConnection::GetIoService() { std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); if(!pinnedEngine) { LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); return nullptr; } - return &pinnedEngine->io_service(); + return pinnedEngine->io_service(); } void RpcConnection::StartReading() { auto shared_this = shared_from_this(); - ::asio::io_service *service = GetIoService(); + std::shared_ptr<IoService> service = GetIoService(); if(!service) { LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); return; } - service->post([shared_this, this] () { - OnRecvCompleted(::asio::error_code(), 0); - }); + service->PostLambda( + [shared_this, this] () { OnRecvCompleted(::asio::error_code(), 0); } + ); } void RpcConnection::HandshakeComplete(const Status &s) { @@ -164,13 +164,14 @@ void RpcConnection::ContextComplete(const Status &s) { void RpcConnection::AsyncFlushPendingRequests() { std::shared_ptr<RpcConnection> shared_this = shared_from_this(); - ::asio::io_service *service = GetIoService(); + std::shared_ptr<IoService> service = GetIoService(); if(!service) { LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); return; } - service->post([shared_this, this]() { + std::function<void()> task = [shared_this, this]() + { std::lock_guard<std::mutex> state_lock(connection_state_lock_); LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")"); @@ -178,7 +179,10 @@ void RpcConnection::AsyncFlushPendingRequests() { if (!outgoing_request_) { FlushPendingRequests(); } - }); + }; + + service->PostTask(task); + } Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { @@ -228,15 +232,17 @@ Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { return status; } - ::asio::io_service *service = GetIoService(); + std::shared_ptr<IoService> service = GetIoService(); if(!service) { LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); return Status::Error("RpcConnection attempted to access invalid IoService"); } - service->post([req, response, status]() { - req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock - }); + service->PostLambda( + [req, response, status]() { + req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock + } + ); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org