Do not leak TcpAcceptor job on reconfiguration by monitoring and reacting to the listening socket closure.
Every job that waits for Comm I/O must have a FD closure handler. Unfortunately, we currently cannot enforce that rule automatically. Alex.
Do not leak TcpAcceptor job on reconfiguration by monitoring and reacting to the listening socket closure. Every job that waits for Comm I/O must have a FD closure handler. Unfortunately, we currently cannot enforce that rule automatically. === modified file 'src/comm/TcpAcceptor.cc' --- src/comm/TcpAcceptor.cc 2014-04-14 06:50:31 +0000 +++ src/comm/TcpAcceptor.cc 2014-04-24 20:12:30 +0000 @@ -105,40 +105,46 @@ Comm::TcpAcceptor::doneAll() const { // stop when FD is closed if (!IsConnOpen(conn)) { return AsyncJob::doneAll(); } // stop when handlers are gone if (theCallSub == NULL) { return AsyncJob::doneAll(); } // open FD with handlers...keep accepting. return false; } void Comm::TcpAcceptor::swanSong() { debugs(5,5, HERE); unsubscribe("swanSong"); + if (IsConnOpen(conn)) { + if (closer_ != NULL) + comm_remove_close_handler(conn->fd, closer_); + conn->close(); + } + conn = NULL; AcceptLimiter::Instance().removeDead(this); AsyncJob::swanSong(); } const char * Comm::TcpAcceptor::status() const { if (conn == NULL) return "[nil connection]"; static char ipbuf[MAX_IPSTRLEN] = {'\0'}; if (ipbuf[0] == '\0') conn->local.toHostStr(ipbuf, MAX_IPSTRLEN); static MemBuf buf; buf.reset(); buf.Printf(" FD %d, %s",conn->fd, ipbuf); const char *jobStatus = AsyncJob::status(); @@ -165,42 +171,57 @@ Comm::TcpAcceptor::setListen() } if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { #ifdef SO_ACCEPTFILTER struct accept_filter_arg afa; bzero(&afa, sizeof(afa)); debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } + + typedef CommCbMemFunT<Comm::TcpAcceptor, CommCloseCbParams> Dialer; + closer_ = JobCallback(5, 4, Dialer, this, Comm::TcpAcceptor::handleClosure); + comm_add_close_handler(conn->fd, closer_); } +/// called when listening descriptor is closed by an external force +/// such as clientHttpConnectionsClose() +void +Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &io) +{ + closer_ = NULL; + conn = NULL; + Must(done()); +} + + /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection * * It will either do that accept operation. Or if there are not enough FD * available to do the clone safely will push the listening FD into a list * of deferred operations. The list gets kicked and the dupe/accept() actually * done later when enough sockets become available. */ void Comm::TcpAcceptor::doAccept(int fd, void *data) { try { debugs(5, 2, HERE << "New connection on FD " << fd); Must(isOpen(fd)); TcpAcceptor *afd = static_cast<TcpAcceptor*>(data); if (!okToAccept()) { AcceptLimiter::Instance().defer(afd); === modified file 'src/comm/TcpAcceptor.h' --- src/comm/TcpAcceptor.h 2013-10-25 00:13:46 +0000 +++ src/comm/TcpAcceptor.h 2014-04-24 20:12:30 +0000 @@ -1,29 +1,31 @@ #ifndef SQUID_COMM_TCPACCEPTOR_H #define SQUID_COMM_TCPACCEPTOR_H #include "base/AsyncJob.h" #include "base/CbcPointer.h" #include "base/Subscription.h" #include "comm/forward.h" #include "comm_err_t.h" +class CommCloseCbParams; + namespace Comm { class AcceptLimiter; /** * Listens on a Comm::Connection for new incoming connections and * emits an active Comm::Connection descriptor for the new client. * * Handles all event limiting required to quash inbound connection * floods within the global FD limits of available Squid_MaxFD and * client_ip_max_connections. * * Fills the emitted connection with all connection details able to * be looked up. Currently these are the local/remote IP:port details * and the listening socket transparent-mode flag. */ class TcpAcceptor : public AsyncJob { public: @@ -56,37 +58,41 @@ public: */ void acceptNext(); /// Call the subscribed callback handler with details about a new connection. void notify(const comm_err_t flag, const Comm::ConnectionPointer &details) const; /// errno code of the last accept() or listen() action if one occurred. int errcode; protected: friend class AcceptLimiter; int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. private: Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. /// conn being listened on for new connections /// Reserved for read-only use. ConnectionPointer conn; + AsyncCall::Pointer closer_; ///< listen socket closure handler + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); /// Method callback for whenever an FD is ready to accept a client connection. static void doAccept(int fd, void *data); void acceptOne(); comm_err_t oldAccept(Comm::ConnectionPointer &details); void setListen(); + void handleClosure(const CommCloseCbParams &io); + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm #endif /* SQUID_COMM_TCPACCEPTOR_H */