Updated Branches: refs/heads/master 2b2cb3406 -> 9d5c4e0a6
Added os::sendfile that masks SIGPIPE. Review: https://reviews.apache.org/r/12619 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14d22143 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14d22143 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14d22143 Branch: refs/heads/master Commit: 14d2214395948b83ca8c989aa18b077994dc800c Parents: 2b2cb34 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Wed Jun 26 16:57:17 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Mon Aug 5 12:38:05 2013 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/3rdparty/Makefile.am | 2 + 3rdparty/libprocess/3rdparty/stout/Makefile.am | 4 + .../3rdparty/stout/include/stout/os.hpp | 16 +- .../stout/include/stout/os/sendfile.hpp | 50 +++++++ .../3rdparty/stout/include/stout/os/signals.hpp | 150 +++++++++++++++++++ .../3rdparty/stout/tests/os/sendfile_tests.cpp | 84 +++++++++++ .../3rdparty/stout/tests/os/signals_tests.cpp | 34 +++++ 7 files changed, 333 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am index 5ade440..0cd407c 100644 --- a/3rdparty/libprocess/3rdparty/Makefile.am +++ b/3rdparty/libprocess/3rdparty/Makefile.am @@ -131,6 +131,8 @@ stout_tests_SOURCES = \ $(STOUT)/tests/protobuf_tests.pb.cc \ $(STOUT)/tests/protobuf_tests.pb.h \ $(STOUT)/tests/protobuf_tests.proto \ + $(STOUT)/tests/os/sendfile_tests.cpp \ + $(STOUT)/tests/os/signals_tests.cpp \ $(STOUT)/tests/strings_tests.cpp \ $(STOUT)/tests/thread_tests.cpp \ $(STOUT)/tests/uuid_tests.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am b/3rdparty/libprocess/3rdparty/stout/Makefile.am index 2748584..e764fe2 100644 --- a/3rdparty/libprocess/3rdparty/stout/Makefile.am +++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am @@ -40,6 +40,8 @@ EXTRA_DIST = \ include/stout/os/ls.hpp \ include/stout/os/osx.hpp \ include/stout/os/process.hpp \ + include/stout/os/sendfile.hpp \ + include/stout/os/signals.hpp \ include/stout/os/sysctl.hpp \ include/stout/owned.hpp \ include/stout/path.hpp \ @@ -65,6 +67,8 @@ EXTRA_DIST = \ tests/multimap_tests.cpp \ tests/none_tests.cpp \ tests/os_tests.cpp \ + tests/os/sendfile_tests.cpp \ + tests/os/signals_tests.cpp \ tests/proc_tests.cpp \ tests/protobuf_tests.cpp \ tests/protobuf_tests.pb.cc \ http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp index 4290396..f159c79 100644 --- a/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os.hpp @@ -23,16 +23,13 @@ #ifdef __linux__ #include <linux/version.h> -#endif +#endif // __linux__ #include <sys/stat.h> #include <sys/statvfs.h> -#ifdef __APPLE__ -#include <sys/sysctl.h> -#endif #ifdef __linux__ #include <sys/sysinfo.h> -#endif +#endif // __linux__ #include <sys/types.h> #include <sys/utsname.h> @@ -57,11 +54,16 @@ #include <stout/os/killtree.hpp> #ifdef __linux__ #include <stout/os/linux.hpp> -#endif +#endif // __linux__ #include <stout/os/ls.hpp> #ifdef __APPLE__ #include <stout/os/osx.hpp> -#endif +#endif // __APPLE__ +#include <stout/os/sendfile.hpp> +#include <stout/os/signals.hpp> +#ifdef __APPLE__ +#include <stout/os/sysctl.hpp> +#endif // __APPLE__ #ifdef __APPLE__ // Assigning the result pointer to ret silences an unused var warning. http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp new file mode 100644 index 0000000..b41ba63 --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/sendfile.hpp @@ -0,0 +1,50 @@ +#ifndef __STOUT_OS_SENDFILE_HPP__ +#define __STOUT_OS_SENDFILE_HPP__ + +#include <errno.h> + +#ifdef __linux__ +#include <sys/sendfile.h> +#endif // __linux__ +#ifdef __APPLE__ +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/uio.h> +#endif // __APPLE__ + +#include <stout/os/signals.hpp> + +namespace os { + +// Returns the amount of bytes written from the input file +// descriptor to the output socket. On error, returns -1 and +// errno indicates the error. +// NOTE: The following limitations exist because of the OS X +// implementation of sendfile: +// 1. s must be a stream oriented socket descriptor. +// 2. fd must be a regular file descriptor. +inline ssize_t sendfile(int s, int fd, off_t offset, size_t length) +{ +#ifdef __linux__ + suppress (SIGPIPE) { + // This will set errno to EPIPE if a SIGPIPE occurs. + return ::sendfile(s, fd, &offset, length); + } +#elif defined __APPLE__ + // On OS X, sendfile does not need to have SIGPIPE suppressed. + off_t _length = static_cast<off_t>(length); + + if (::sendfile(fd, s, offset, &_length, NULL, 0) < 0) { + if (errno == EAGAIN && _length > 0) { + return _length; + } + return -1; + } + + return _length; +#endif // __APPLE__ +} + +} // namespace os { + +#endif // __STOUT_OS_SENDFILE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp new file mode 100644 index 0000000..215ee55 --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/include/stout/os/signals.hpp @@ -0,0 +1,150 @@ +#ifndef __STOUT_OS_SIGNALS_HPP__ +#define __STOUT_OS_SIGNALS_HPP__ + +#include <errno.h> +#include <pthread.h> +#include <signal.h> +#include <unistd.h> + +namespace os { + +namespace signals { + +// Returns true iff the signal is pending. +inline bool pending(int signal) +{ + sigset_t set; + sigemptyset(&set); + sigpending(&set); + return sigismember(&set, signal); +} + + +// Returns true if the signal has been blocked, or false if the +// signal was already blocked. +inline bool block(int signal) +{ + sigset_t set; + sigaddset(&set, signal); + + sigset_t oldset; + sigemptyset(&oldset); + + // We ignore errors here as the only documented one is + // EINVAL due to a bad value of the SIG_* argument. + pthread_sigmask(SIG_BLOCK, &set, &oldset); + + return !sigismember(&oldset, signal); +} + + +// Returns true if the signal has been unblocked, or false if the +// signal was not previously blocked. +inline bool unblock(int signal) +{ + sigset_t set; + sigaddset(&set, signal); + + sigset_t oldset; + sigemptyset(&oldset); + + pthread_sigmask(SIG_UNBLOCK, &set, &oldset); + + return sigismember(&oldset, signal); +} + +namespace internal { + +// Suppresses a signal on the current thread for the lifetime of +// the Suppressor. The signal *must* be synchronous and delivered +// per-thread. The suppression occurs only on the thread of +// execution of the Suppressor. +struct Suppressor +{ + Suppressor(int _signal) + : signal(_signal), pending(false), unblock(false) + { + // Check to see if the signal is already reported as pending. + // If pending, it means the thread already blocks the signal! + // Therefore, any new instances of the signal will also be + // blocked and merged with the pending one since there is no + // queuing for signals. + pending = signals::pending(signal); + + if (!pending) { + // Block the signal for this thread only. If already blocked, + // there's no need to unblock it. + unblock = signals::block(signal); + } + } + + ~Suppressor() + { + // We want to preserve errno when the Suppressor drops out of + // scope. Otherwise, one needs to potentially store errno when + // using the suppress() macro. + int _errno = errno; + + // If the signal has become pending after we blocked it, we + // need to clear it before unblocking it. + if (!pending && signals::pending(signal)) { + // It is possible that in between having observed the pending + // signal with sigpending() and clearing it with sigwait(), + // the signal was delivered to another thread before we were + // able to clear it here. This can happen if the signal was + // generated for the whole process (e.g. a kill was issued). + // See 2.4.1 here: + // http://pubs.opengroup.org/onlinepubs/009695399/functions/xsh_chap02_04.html + // To handle the above scenario, one can either: + // 1. Use sigtimedwait() with a timeout of 0, to ensure we + // don't block forever. However, this only works on Linux + // and we may still swallow the signal intended for the + // process. + // 2. After seeing the pending signal, signal ourselves with + // pthread_kill prior to calling sigwait(). This can still + // swallow the signal intended for the process. + // We chose to use the latter technique as it works on all + // POSIX systems and is less likely to swallow process signals, + // provided the thread signal and process signal are not merged. + pthread_kill(pthread_self(), signal); + + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, signal); + + int result; + do { + int _ignored; + result = sigwait(&mask, &_ignored); + } while (result == -1 && errno == EINTR); + } + + // Unblock the signal (only if we were the ones to block it). + if (unblock) { + signals::unblock(signal); + } + + // Restore errno. + errno = _errno; + } + + // Needed for the suppress() macro. + operator bool () { return true; } + +private: + const int signal; + bool pending; // Whether the signal is already pending. + bool unblock; // Whether to unblock the signal on destruction. +}; + +} // namespace internal { + +#define suppress(signal) \ + if (os::signals::internal::Suppressor suppressor ## signal = \ + os::signals::internal::Suppressor(signal)) + +} // namespace signals { + +} // namespace os { + +#endif // __STOUT_OS_SIGNALS_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp new file mode 100644 index 0000000..194906e --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/tests/os/sendfile_tests.cpp @@ -0,0 +1,84 @@ +#include <gmock/gmock.h> + +#include <gtest/gtest.h> + +#include <stout/gtest.hpp> +#include <stout/os.hpp> +#include <stout/path.hpp> + +using std::string; + +// TODO(bmahler): Extend from OsTest. +class OsSendfileTest : public ::testing::Test +{ +public: + OsSendfileTest() + : LOREM_IPSUM( + "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " + "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim " + "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " + "aliquip ex ea commodo consequat. Duis aute irure dolor in " + "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla " + "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " + "culpa qui officia deserunt mollit anim id est laborum.") {} + +protected: + virtual void SetUp() + { + const Try<string>& mkdtemp = os::mkdtemp(); + ASSERT_SOME(mkdtemp); + tmpdir = mkdtemp.get(); + filename = path::join(mkdtemp.get(), "lorem.txt"); + + ASSERT_SOME(os::write(filename, LOREM_IPSUM)); + } + + virtual void TearDown() + { + ASSERT_SOME(os::rmdir(tmpdir)); + } + + const string LOREM_IPSUM; + string filename; + +private: + string tmpdir; +}; + + +TEST_F(OsSendfileTest, sendfile) +{ + Try<int> fd = os::open(filename, O_RDONLY); + ASSERT_SOME(fd); + + // Construct a socket pair and use sendfile to transmit the text. + int s[2]; + ASSERT_NE(-1, socketpair(AF_UNIX, SOCK_STREAM, 0, s)) << strerror(errno); + ASSERT_EQ( + LOREM_IPSUM.size(), + os::sendfile(s[0], fd.get(), 0, LOREM_IPSUM.size())); + + char* buffer = new char[LOREM_IPSUM.size()]; + ASSERT_EQ(LOREM_IPSUM.size(), read(s[1], buffer, LOREM_IPSUM.size())); + ASSERT_EQ(LOREM_IPSUM, string(buffer, LOREM_IPSUM.size())); + ASSERT_SOME(os::close(fd.get())); + delete buffer; + + // Now test with a closed socket, the SIGPIPE should be suppressed! + fd = os::open(filename, O_RDONLY); + ASSERT_SOME(fd); + ASSERT_SOME(os::close(s[1])); + + ssize_t result = os::sendfile(s[0], fd.get(), 0, LOREM_IPSUM.size()); + int _errno = errno; + ASSERT_EQ(-1, result); + +#ifdef __linux__ + ASSERT_EQ(EPIPE, _errno) << strerror(_errno); +#elif defined __APPLE__ + ASSERT_EQ(ENOTCONN, _errno) << strerror(_errno); +#endif + + ASSERT_SOME(os::close(fd.get())); + ASSERT_SOME(os::close(s[0])); +} http://git-wip-us.apache.org/repos/asf/mesos/blob/14d22143/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp new file mode 100644 index 0000000..66caa04 --- /dev/null +++ b/3rdparty/libprocess/3rdparty/stout/tests/os/signals_tests.cpp @@ -0,0 +1,34 @@ +#include <errno.h> + +#include <gmock/gmock.h> + +#include <gtest/gtest.h> + +#include <stout/gtest.hpp> +#include <stout/os.hpp> + +using std::string; + +// TODO(bmahler): Expose OsTest so this can use it. +class OsSignalsTest : public ::testing::Test {}; + + +TEST_F(OsSignalsTest, suppress) +{ + int pipes[2]; + ASSERT_NE(-1, pipe(pipes)); + + ASSERT_SOME(os::close(pipes[0])); + + const string data = "hello"; + + // Let's make sure we can suppress SIGPIPE! + suppress(SIGPIPE) { + // Writing to a pipe that has been closed generates SIGPIPE. + ASSERT_EQ(-1, write(pipes[1], data.c_str(), data.length())); + + ASSERT_EQ(EPIPE, errno); + } + + ASSERT_SOME(os::close(pipes[1])); +}