net/loolnb.cpp | 210 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 117 insertions(+), 93 deletions(-)
New commits: commit b9f4bed8ea75138ed04a66ffc6ccfd76396cc832 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Mon Feb 13 21:49:59 2017 -0500 nb: separate Socket into ClientSocket and ServerSocket Change-Id: I1aafd6192b955e51b8f1e74c1aad5fc3603f71d6 Reviewed-on: https://gerrit.libreoffice.org/34237 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/net/loolnb.cpp b/net/loolnb.cpp index 07c5c82..0ca4faf 100644 --- a/net/loolnb.cpp +++ b/net/loolnb.cpp @@ -127,87 +127,6 @@ public: return rc; } - /// Connect to a server address. - /// Does not retry on error. - /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever. - /// Returns true on success only. - /// Note: when succceeds, caller must check for - /// EINPROGRESS and poll for write, then getError(), - /// only when the latter returns 0 we are connected. - bool connect(const SocketAddress& address, const int timeoutMs = 0) - { - const int rc = ::connect(_fd, address.addr(), address.length()); - if (rc == 0) - { - return true; - } - - if (errno != EINPROGRESS) - { - return false; - } - - // Wait for writable, then check again. - pollWrite(timeoutMs); - - // Now check if we connected, not, or not yet. - return (getError() == 0 || errno == EINPROGRESS); - } - - /// Binds to a local address (Servers only). - /// Does not retry on error. - /// Returns true on success only. - bool bind(const SocketAddress& address) - { - // Enable address reuse to avoid stalling after - // recycling, when previous socket is TIME_WAIT. - //TODO: Might be worth refactoring out. - const int reuseAddress = 1; - constexpr unsigned int len = sizeof(reuseAddress); - ::setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len); - - const int rc = ::bind(_fd, address.addr(), address.length()); - return (rc == 0); - } - - /// Listen to incoming connections (Servers only). - /// Does not retry on error. - /// Returns true on success only. - bool listen(const int backlog = 64) - { - const int rc = ::listen(_fd, backlog); - return (rc == 0); - } - - /// Accepts an incoming connection (Servers only). - /// Does not retry on error. - /// Returns a valid Socket shared_ptr on success only. - std::shared_ptr<Socket> accept() - { - // Accept a connection (if any) and set it to non-blocking. - // We don't care about the client's address, so ignored. - const int rc = ::accept4(_fd, nullptr, nullptr, SOCK_NONBLOCK); - return std::shared_ptr<Socket>(rc != -1 ? new Socket(rc) : nullptr); - } - - /// Send data to our peer. - /// Returns the number of bytes sent, -1 on error. - int send(const void* buf, const size_t len) - { - // Don't SIGPIPE when the other end closes. - const int rc = ::send(_fd, buf, len, MSG_NOSIGNAL); - return rc; - } - - /// Receive data from our peer. - /// Returns the number of bytes received, -1 on error, - /// and 0 when the peer has performed an orderly shutdown. - int recv(void* buf, const size_t len) - { - const int rc = ::recv(_fd, buf, len, 0); - return rc; - } - /// Poll the socket for either read, write, or both. /// Returns -1 on failure/error (query socket error), 0 for timeout, /// otherwise, depending on events, the respective bits set. @@ -219,7 +138,7 @@ public: pollfd poll; memset(&poll, 0, sizeof(poll)); - poll.fd = _fd; + poll.fd = fd(); poll.events |= events; int rc; @@ -277,7 +196,7 @@ public: return (rc > 0 && (rc & POLLOUT)); } -private: +protected: /// Construct based on an existing socket fd. /// Used by accept() only. @@ -290,6 +209,111 @@ private: const int _fd; }; +/// A non-blocking, client socket. +class ClientSocket : public Socket +{ +public: + ClientSocket() : + Socket() + { + } + + /// Connect to a server address. + /// Does not retry on error. + /// timeoutMs can be 0 to avoid waiting, or -1 to wait forever. + /// Returns true on success only. + /// Note: when succceeds, caller must check for + /// EINPROGRESS and poll for write, then getError(), + /// only when the latter returns 0 we are connected. + bool connect(const SocketAddress& address, const int timeoutMs = 0) + { + const int rc = ::connect(fd(), address.addr(), address.length()); + if (rc == 0) + { + return true; + } + + if (errno != EINPROGRESS) + { + return false; + } + + // Wait for writable, then check again. + pollWrite(timeoutMs); + + // Now check if we connected, not, or not yet. + return (getError() == 0 || errno == EINPROGRESS); + } + + /// Send data to our peer. + /// Returns the number of bytes sent, -1 on error. + int send(const void* buf, const size_t len) + { + // Don't SIGPIPE when the other end closes. + const int rc = ::send(fd(), buf, len, MSG_NOSIGNAL); + return rc; + } + + /// Receive data from our peer. + /// Returns the number of bytes received, -1 on error, + /// and 0 when the peer has performed an orderly shutdown. + int recv(void* buf, const size_t len) + { + const int rc = ::recv(fd(), buf, len, 0); + return rc; + } + +protected: + ClientSocket(const int fd) : + Socket(fd) + { + } + + friend class ServerSocket; +}; + +/// A non-blocking, streaming socket. +class ServerSocket : public Socket +{ +public: + + /// Binds to a local address (Servers only). + /// Does not retry on error. + /// Returns true on success only. + bool bind(const SocketAddress& address) + { + // Enable address reuse to avoid stalling after + // recycling, when previous socket is TIME_WAIT. + //TODO: Might be worth refactoring out. + const int reuseAddress = 1; + constexpr unsigned int len = sizeof(reuseAddress); + ::setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &reuseAddress, len); + + const int rc = ::bind(fd(), address.addr(), address.length()); + return (rc == 0); + } + + /// Listen to incoming connections (Servers only). + /// Does not retry on error. + /// Returns true on success only. + bool listen(const int backlog = 64) + { + const int rc = ::listen(fd(), backlog); + return (rc == 0); + } + + /// Accepts an incoming connection (Servers only). + /// Does not retry on error. + /// Returns a valid Socket shared_ptr on success only. + std::shared_ptr<ClientSocket> accept() + { + // Accept a connection (if any) and set it to non-blocking. + // We don't care about the client's address, so ignored. + const int rc = ::accept4(fd(), nullptr, nullptr, SOCK_NONBLOCK); + return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr); + } +}; + /// Handles non-blocking socket event polling. /// Only polls on N-Sockets and invokes callback and /// doesn't manage buffers or client data. @@ -373,7 +397,7 @@ public: /// Insert a new socket to be polled. /// Sockets are removed only when the handler return false. - void insertNewSocket(const std::shared_ptr<Socket>& newSocket) + void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket) { std::lock_guard<std::mutex> lock(_mutex); @@ -422,10 +446,10 @@ private: /// main-loop wakeup pipe int _wakeup[2]; /// The sockets we're controlling - std::vector<std::shared_ptr<Socket>> _pollSockets; + std::vector<std::shared_ptr<ClientSocket>> _pollSockets; /// Protects _newSockets std::mutex _mutex; - std::vector<std::shared_ptr<Socket>> _newSockets; + std::vector<std::shared_ptr<ClientSocket>> _newSockets; /// The fds to poll. std::vector<pollfd> _pollFds; }; @@ -468,7 +492,7 @@ SocketAddress addr("127.0.0.1", PortNumber); void client(const int timeoutMs) { - const auto client = std::make_shared<Socket>(); + const auto client = std::make_shared<ClientSocket>(); if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS) { const std::string msg = "Failed to call connect. (errno: "; @@ -498,10 +522,10 @@ void client(const int timeoutMs) } } -void server(SocketPoll<Socket>& poller) +void server(SocketPoll<ClientSocket>& poller) { // Start server. - auto server = std::make_shared<Socket>(); + auto server = std::make_shared<ServerSocket>(); if (!server->bind(addr)) { const std::string msg = "Failed to bind. (errno: "; @@ -519,7 +543,7 @@ void server(SocketPoll<Socket>& poller) { if (server->pollRead(30000)) { - std::shared_ptr<Socket> clientSocket = server->accept(); + std::shared_ptr<ClientSocket> clientSocket = server->accept(); if (!clientSocket) { const std::string msg = "Failed to accept. (errno: "; @@ -533,11 +557,11 @@ void server(SocketPoll<Socket>& poller) } /// Poll client sockets and do IO. -void pollAndComm(SocketPoll<Socket>& poller, std::atomic<bool>& stop) +void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop) { while (!stop) { - poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events) + poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events) { if (events & POLLIN) { @@ -588,7 +612,7 @@ int main(int argc, const char**) } // Used to poll client sockets. - SocketPoll<Socket> poller; + SocketPoll<ClientSocket> poller; // Start the client polling thread. Thread threadPoll([&poller](std::atomic<bool>& stop) _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits