net/loolnb.cpp | 129 ++++++++++++++++++++++++++++++--------------------------- 1 file changed, 70 insertions(+), 59 deletions(-)
New commits: commit 404c1fab379650bf9cde78ae1398c5800bb232e9 Author: Ashod Nakashian <ashod.nakash...@collabora.co.uk> Date: Mon Feb 13 19:40:41 2017 -0500 nb: poll wakeup pipe and simplified polling Change-Id: I2e688b985d4a9bf7cbe8eef5df10f67bfc96f91c Reviewed-on: https://gerrit.libreoffice.org/34233 Reviewed-by: Ashod Nakashian <ashnak...@gmail.com> Tested-by: Ashod Nakashian <ashnak...@gmail.com> diff --git a/net/loolnb.cpp b/net/loolnb.cpp index e7ce4f0..78e61b1 100644 --- a/net/loolnb.cpp +++ b/net/loolnb.cpp @@ -306,91 +306,102 @@ class SocketPoll public: SocketPoll() { + // Create the wakeup fd. + if (::pipe2(_wakeup, O_CLOEXEC | O_NONBLOCK) == -1) + { + //FIXME: Can't have wakeup pipe, should we exit? + _wakeup[0] = -1; + _wakeup[1] = -1; + } } - void add(const std::shared_ptr<T>& socket) - { - const int fd = socket->fd(); - pollfd poll; - memset(&poll, 0, sizeof(pollfd)); - poll.fd = fd; - poll.events = (POLLIN | POLLOUT); - - std::lock_guard<std::mutex> lock(_mutex); - - _sockets.emplace(fd, socket); - _pollDesc.emplace_back(poll); - } - - void remove(const std::shared_ptr<T>& socket) + /// Poll the sockets for available data to read or buffer to write. + void poll(const int timeoutMs, const std::function<bool(const std::shared_ptr<T>&, const int)>& handler) { - const int fd = socket->fd(); + const size_t size = _pollSockets.size(); + std::vector<pollfd> pollFds(size + 1); // + wakeup fd - std::lock_guard<std::mutex> lock(_mutex); + for (size_t i = 0; i < size; ++i) + { + pollFds[i].fd = _pollSockets[i]->fd(); + pollFds[i].events = POLLIN | POLLOUT; //TODO: Get from the socket. + pollFds[i].revents = 0; + } - // Hold reference to socket so it doesn't - // close while we are in poll(2). - _socketsDead.emplace(fd, socket); - _sockets.erase(fd); - } + // Add the read-end of the wake pipe. + pollFds[size].fd = _wakeup[0]; + pollFds[size].events = POLLIN; + pollFds[size].revents = 0; - /// Poll the sockets for available data to read or buffer to write. - void poll(const std::function<bool(const std::shared_ptr<T>&, const int)>& handler) - { int rc; do { - // See note in class doc. - rc = ::poll(&_pollDesc[0], _pollDesc.size(), 0); + rc = ::poll(&pollFds[0], pollFds.size(), timeoutMs); } while (rc < 0 && errno == EINTR); - for (const pollfd& poll : _pollDesc) + // Fire the callback and remove dead fds. + for (int i = static_cast<int>(size) - 1; i >= 0; --i) { - if (poll.revents) + if (pollFds[i].revents) { - std::lock_guard<std::mutex> lock(_mutex); - const auto it = _sockets.find(poll.fd); - if (it != _sockets.end() && it->second) + if (!handler(_pollSockets[i], pollFds[i].revents)) { - if (!handler(it->second, poll.revents)) - { - std::cout << "Removing: " << poll.fd << std::endl; - _socketsDead.emplace(poll.fd, it->second); - _sockets.erase(poll.fd); - } + std::cout << "Removing: " << pollFds[i].fd << std::endl; + _pollSockets.erase(_pollSockets.begin() + i); } } } - // Now clear the dead sockets to close/free them. - std::lock_guard<std::mutex> lock(_mutex); - - // Remove the pollfd of these sockets as well. - size_t size = 0; - for (size_t i = 0; i < _pollDesc.size(); ++i) + if (pollFds[size].revents) { - const auto it = _socketsDead.find(_pollDesc[i].fd); - if (it != _socketsDead.end()) - { - // Move to the end. - std::swap(_pollDesc[i], _pollDesc[_pollDesc.size() - 1]); - } - else + // Process new sockets first. + addNewSocketsToPoll(); + + // Clear the data. + int dump; + if (::read(_wakeup[0], &dump, sizeof(4)) == -1) { - ++size; + // Nothing to do. } } + } + + /// Insert a new socket to be polled. + /// Sockets are removed only when the handler return false. + void insertNewSocket(const std::shared_ptr<Socket>& newSocket) + { + std::lock_guard<std::mutex> lock(_mutex); + + _newSockets.emplace_back(newSocket); + + // wakeup the main-loop. + if (::write(_wakeup[1], "w", 1) == -1) + { + // No wake up then. + } + } + +private: + + /// Add the new sockets to list of those to poll. + void addNewSocketsToPoll() + { + std::lock_guard<std::mutex> lock(_mutex); - _pollDesc.resize(size); - _socketsDead.clear(); + // Copy the new sockets over and clear. + _pollSockets.insert(_pollSockets.end(), _newSockets.begin(), _newSockets.end()); + _newSockets.clear(); } private: + /// main-loop wakeup pipe + int _wakeup[2]; + /// The sockets we're controlling + std::vector<std::shared_ptr<Socket>> _pollSockets; + /// Protects _newSockets std::mutex _mutex; - std::map<int, std::shared_ptr<T>> _sockets; - std::map<int, std::shared_ptr<T>> _socketsDead; - std::vector<pollfd> _pollDesc; + std::vector<std::shared_ptr<Socket>> _newSockets; }; /// Generic thread class. @@ -482,7 +493,7 @@ int main(int argc, const char**) { while (!stop) { - poller.poll([](const std::shared_ptr<Socket>& socket, const int events) + poller.poll(5000, [](const std::shared_ptr<Socket>& socket, const int events) { if (events & POLLIN) { @@ -550,7 +561,7 @@ int main(int argc, const char**) } std::cout << "Accepted client #" << clientSocket->fd() << std::endl; - poller.add(clientSocket); + poller.insertNewSocket(clientSocket); } } _______________________________________________ Libreoffice-commits mailing list libreoffice-comm...@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/libreoffice-commits