Updated 'io::redirect()' to take an optional vector of callback hooks. These callback hooks will be invoked before passing any data read from the 'from' file descriptor on to the 'to' file descriptor.
Review: https://reviews.apache.org/r/54053 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b7937a68 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b7937a68 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b7937a68 Branch: refs/heads/master Commit: b7937a68367088f3c1f7c334307422c71737b1d7 Parents: c33ba20 Author: Kevin Klues <klue...@gmail.com> Authored: Wed Nov 23 22:48:09 2016 -0800 Committer: Jie Yu <yujie....@gmail.com> Committed: Thu Dec 1 10:11:45 2016 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/io.hpp | 16 ++++++++--- 3rdparty/libprocess/src/io.cpp | 36 ++++++++++++++++++++----- 3rdparty/libprocess/src/tests/io_tests.cpp | 17 +++++++++--- 3 files changed, 56 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/include/process/io.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp index eec5efd..f5489dc 100644 --- a/3rdparty/libprocess/include/process/io.hpp +++ b/3rdparty/libprocess/include/process/io.hpp @@ -115,7 +115,8 @@ Future<Nothing> write(int fd, const std::string& data); /** * Redirect output from the 'from' file descriptor to the 'to' file - * descriptor (or /dev/null if 'to' is None). + * descriptor (or /dev/null if 'to' is None). Optionally call a vector + * of callback hooks, passing them the data before it is written to 'to'. * * The 'to' and 'from' file descriptors will be duplicated so that the * file descriptors' lifetimes can be controlled within this function. @@ -125,10 +126,19 @@ Future<Nothing> write(int fd, const std::string& data); * descriptor is bad, or if the file descriptor cannot be duplicated, * set to close-on-exec, or made non-blocking. */ -Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096); +Future<Nothing> redirect( + int from, + Option<int> to, + size_t chunk = 4096, + const std::vector<lambda::function<void(const std::string&)>>& hooks = {}); + #ifdef __WINDOWS__ // Version of this function compatible with Windows `HANDLE`. -Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk = 4096); +Future<Nothing> redirect( + HANDLE from, + Option<int> to, + size_t chunk = 4096, + const std::vector<lambda::function<void(const std::string&)>>& hooks = {}); #endif // __WINDOWS__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/io.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp index d930cee..e81f279 100644 --- a/3rdparty/libprocess/src/io.cpp +++ b/3rdparty/libprocess/src/io.cpp @@ -28,6 +28,7 @@ #include <stout/try.hpp> using std::string; +using std::vector; namespace process { namespace io { @@ -334,6 +335,7 @@ void _splice( int from, int to, size_t chunk, + const vector<lambda::function<void(const string&)>>& hooks, boost::shared_array<char> data, std::shared_ptr<Promise<Nothing>> promise) { @@ -362,12 +364,19 @@ void _splice( if (size == 0) { // EOF. promise->set(Nothing()); } else { + // Send the data to the redirect hooks. + foreach ( + const lambda::function<void(const string&)>& hook, + hooks) { + hook(string(data.get(), size)); + } + // Note that we always try and complete the write, even if a // discard has occurred on our future, in order to provide // semantics where everything read is written. The promise // will eventually be discarded in the next read. io::write(to, string(data.get(), size)) - .onReady([=]() { _splice(from, to, chunk, data, promise); }) + .onReady([=]() { _splice(from, to, chunk, hooks, data, promise); }) .onFailed([=](const string& message) { promise->fail(message); }) .onDiscarded([=]() { promise->discard(); }); } @@ -377,7 +386,11 @@ void _splice( } -Future<Nothing> splice(int from, int to, size_t chunk) +Future<Nothing> splice( + int from, + int to, + size_t chunk, + const vector<lambda::function<void(const string&)>>& hooks) { boost::shared_array<char> data(new char[chunk]); @@ -389,7 +402,7 @@ Future<Nothing> splice(int from, int to, size_t chunk) Future<Nothing> future = promise->future(); - _splice(from, to, chunk, data, promise); + _splice(from, to, chunk, hooks, data, promise); return future; } @@ -496,7 +509,11 @@ Future<Nothing> write(int fd, const string& data) } -Future<Nothing> redirect(int from, Option<int> to, size_t chunk) +Future<Nothing> redirect( + int from, + Option<int> to, + size_t chunk, + const vector<lambda::function<void(const string&)>>& hooks) { // Make sure we've got "valid" file descriptors. if (from < 0 || (to.isSome() && to.get() < 0)) { @@ -562,7 +579,7 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk) } // NOTE: We wrap `os::close` in a lambda to disambiguate on Windows. - return internal::splice(from, to.get(), chunk) + return internal::splice(from, to.get(), chunk, hooks) .onAny([from]() { os::close(from); }) .onAny([to]() { os::close(to.get()); }); } @@ -571,12 +588,17 @@ Future<Nothing> redirect(int from, Option<int> to, size_t chunk) #ifdef __WINDOWS__ // NOTE: Ordinarily this would go in a Windows-specific header; we put it here // to avoid complex forward declarations. -Future<Nothing> redirect(HANDLE from, Option<int> to, size_t chunk) +Future<Nothing> redirect( + HANDLE from, + Option<int> to, + size_t chunk, + const vector<lambda::function<void(const string&)>>& hooks) { return redirect( _open_osfhandle(reinterpret_cast<intptr_t>(from), O_RDWR), to, - chunk); + chunk, + hooks); } #endif // __WINDOWS__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b7937a68/3rdparty/libprocess/src/tests/io_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp index 66b610e..26365a5 100644 --- a/3rdparty/libprocess/src/tests/io_tests.cpp +++ b/3rdparty/libprocess/src/tests/io_tests.cpp @@ -322,7 +322,14 @@ TEST_F(IOTest, Redirect) ASSERT_SOME(os::nonblock(pipes[0])); ASSERT_SOME(os::nonblock(pipes[1])); - // Now write data to the pipe and splice to the file. + // Set up a redirect hook to also accumlate the data that we splice. + string accumulated = ""; + lambda::function<void(const string&)> hook = + [&accumulated](const string& data) { + accumulated += data; + }; + + // Now write data to the pipe and splice to the file and the redirect hook. string data = "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim " @@ -337,7 +344,7 @@ TEST_F(IOTest, Redirect) data.append(data); } - Future<Nothing> redirect = io::redirect(pipes[0], fd.get()); + Future<Nothing> redirect = io::redirect(pipes[0], fd.get(), 256, {hook}); // Closing the read end of the pipe and the file should not have any // impact as we dup the file descriptor. @@ -358,10 +365,14 @@ TEST_F(IOTest, Redirect) AWAIT_READY(redirect); - // Now make sure all the data is there! + // Now make sure all the data is in the file! Try<string> read = os::read(path.get()); ASSERT_SOME(read); EXPECT_EQ(data, read.get()); + + // Also make sure the data was properly + // accumulated in the redirect hook. + EXPECT_EQ(data, accumulated); }