On 19/09/10 23:35, Amos Jeffries wrote:
Round 2 of only src/comm/ changes.

I believe this accounts for all the bits requested to date. Including
adding Subscriptions for ConnAcceptor.

Amos

... and here is the patch. :(


Amos
=== modified file 'src/comm/AcceptLimiter.cc'
--- src/comm/AcceptLimiter.cc	2009-12-31 02:35:01 +0000
+++ src/comm/AcceptLimiter.cc	2010-09-18 02:09:49 +0000
@@ -1,6 +1,7 @@
 #include "config.h"
 #include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
 #include "fde.h"
 
 Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,22 +12,41 @@
 }
 
 void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::ConnAcceptor *afd)
 {
     afd->isLimited++;
-    debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+    debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited);
     deferred.push_back(afd);
 }
 
 void
+Comm::AcceptLimiter::removeDead(const Comm::ConnAcceptor *afd)
+{
+    for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+        if (deferred[i] == afd) {
+            deferred[i]->isLimited--;
+            deferred[i] = NULL; // fast. kick() will skip empty entries later.
+            debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited);
+        }
+    }
+}
+
+void
 Comm::AcceptLimiter::kick()
 {
+    // TODO: this could be optimized further with an iterator to search
+    //       looking for first non-NULL, followed by dumping the first N
+    //       with only one shift()/pop_ftron operation
+
     debugs(5, 5, HERE << " size=" << deferred.size());
-    if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
-        debugs(5, 5, HERE << " doing one.");
+    while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
         /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
-        ListenStateData *temp = deferred.shift();
-        temp->isLimited--;
-        temp->acceptNext();
+        ConnAcceptor *temp = deferred.shift();
+        if (temp != NULL) {
+            debugs(5, 5, HERE << " doing one.");
+            temp->isLimited--;
+            temp->acceptNext();
+            break;
+        }
     }
 }

=== modified file 'src/comm/AcceptLimiter.h'
--- src/comm/AcceptLimiter.h	2010-01-13 01:13:17 +0000
+++ src/comm/AcceptLimiter.h	2010-09-18 02:11:31 +0000
@@ -1,12 +1,12 @@
-#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
-#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+#ifndef _SQUID_SRC_COMM_ACCEPTLIMITER_H
+#define _SQUID_SRC_COMM_ACCEPTLIMITER_H
 
 #include "Array.h"
 
 namespace Comm
 {
 
-class ListenStateData;
+class ConnAcceptor;
 
 /**
  * FIFO Queue holding listener socket handlers which have been activated
@@ -14,7 +14,8 @@
  * But when doing so there were not enough FD available to handle the
  * new connection. These handlers are awaiting some FD to become free.
  *
- * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
+ * defer - used only by Comm layer ConnAcceptor adding themselves when FD are limited.
+ * removeDead - used only by Comm layer ConnAcceptor to remove themselves when dying.
  * kick - used by Comm layer when FD are closed.
  */
 class AcceptLimiter
@@ -25,7 +26,10 @@
     static AcceptLimiter &Instance();
 
     /** delay accepting a new client connection. */
-    void defer(Comm::ListenStateData *afd);
+    void defer(Comm::ConnAcceptor *afd);
+
+    /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+    void removeDead(const Comm::ConnAcceptor *afd);
 
     /** try to accept and begin processing any delayed client connections. */
     void kick();
@@ -34,9 +38,9 @@
     static AcceptLimiter Instance_;
 
     /** FIFO queue */
-    Vector<Comm::ListenStateData*> deferred;
+    Vector<Comm::ConnAcceptor*> deferred;
 };
 
 }; // namepace Comm
 
-#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */
+#endif /* _SQUID_SRC_COMM_ACCEPTLIMITER_H */

=== renamed file 'src/comm/ListenStateData.cc' => 'src/comm/ConnAcceptor.cc'
--- src/comm/ListenStateData.cc	2010-08-10 03:11:19 +0000
+++ src/comm/ConnAcceptor.cc	2010-09-18 01:56:02 +0000
@@ -33,15 +33,100 @@
  */
 
 #include "squid.h"
+#include "base/TextException.h"
 #include "CommCalls.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
-#include "ConnectionDetail.h"
+#include "comm/Connection.h"
+#include "comm/ConnAcceptor.h"
 #include "fde.h"
 #include "protos.h"
 #include "SquidTime.h"
 
+namespace Comm {
+    CBDATA_CLASS_INIT(ConnAcceptor);
+};
+
+Comm::ConnAcceptor::ConnAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) :
+        AsyncJob("Comm::ConnAcceptor"),
+        errcode(0),
+        isLimited(0),
+        theCallSub(aSub),
+        conn(newConn)
+{
+    assert(newConn != NULL);
+
+    /* open the conn if its not already open */
+    if (!IsConnOpen(conn)) {
+        conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
+        errcode = errno;
+
+        if (!conn->isOpen()) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open failed: " << conn << " error: " << errcode);
+            conn = NULL;
+            return;
+        }
+        debugs(9, 3, HERE << "Unconnected data socket created on " << conn);
+    }
+    assert(IsConnOpen(newConn));
+}
+
+void
+Comm::ConnAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+    debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << aSub);
+    unsubscribe("subscription change");
+    theCallSub = aSub;
+}
+
+void
+Comm::ConnAcceptor::unsubscribe(const char *reason)
+{
+    debugs(5, 5, HERE << conn << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+    theCallSub = NULL;
+}
+
+void
+Comm::ConnAcceptor::start()
+{
+    debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << theCallSub);
+
+    Must(IsConnOpen(conn));
+
+    setListen();
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::ConnAcceptor::doneAll() const
+{
+    // stop when FD is closed
+    if (!IsConnOpen(conn)) {
+        return AsyncJob::doneAll();
+    }
+
+    // stop when handlers are gone
+    if (theCallSub == NULL) {
+        return AsyncJob::doneAll();
+    }
+
+    // open FD with handlers...keep accepting.
+    return false;
+}
+
+void
+Comm::ConnAcceptor::swanSong()
+{
+    debugs(5,5, HERE);
+    unsubscribe("swanSong");
+    conn = NULL;
+    AcceptLimiter::Instance().removeDead(this);
+    AsyncJob::swanSong();
+}
+
 /**
  * New-style listen and accept routines
  *
@@ -50,11 +135,11 @@
  * accept()ed some time later.
  */
 void
-Comm::ListenStateData::setListen()
+Comm::ConnAcceptor::setListen()
 {
     errcode = 0; // reset local errno copy.
-    if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+    if (listen(conn->fd, Squid_MaxFD >> 2) < 0) {
+        debugs(50, DBG_CRITICAL, "ERROR: listen(" << conn << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
         errcode = errno;
         return;
     }
@@ -63,40 +148,22 @@
 #ifdef SO_ACCEPTFILTER
         struct accept_filter_arg afa;
         bzero(&afa, sizeof(afa));
-        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn);
         xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
-        if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
-            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+        if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+            debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
 #elif defined(TCP_DEFER_ACCEPT)
         int seconds = 30;
         if (strncmp(Config.accept_filter, "data=", 5) == 0)
             seconds = atoi(Config.accept_filter + 5);
-        if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
-            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+        if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+            debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
 #else
-        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+        debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
 #endif
     }
 }
 
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
-        fd(aFd),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    assert(aFd >= 0);
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(aFd));
-    setListen();
-    commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
-    comm_close(fd);
-    fd = -1;
-}
-
 /**
  * This private callback is called whenever a filedescriptor is ready
  * to dupe itself and fob off an accept()ed connection
@@ -107,23 +174,30 @@
  * done later when enough sockets become available.
  */
 void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::ConnAcceptor::doAccept(int fd, void *data)
 {
-    debugs(5, 2, HERE << "New connection on FD " << fd);
-
-    assert(isOpen(fd));
-    ListenStateData *afd = static_cast<ListenStateData*>(data);
-
-    if (!okToAccept()) {
-        AcceptLimiter::Instance().defer(afd);
-    } else {
-        afd->acceptNext();
+    try {
+        debugs(5, 2, HERE << "New connection on FD " << fd);
+
+        Must(isOpen(fd));
+        ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
+
+        if (!okToAccept()) {
+            AcceptLimiter::Instance().defer(afd);
+        } else {
+            afd->acceptNext();
+        }
+        commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
+
+    } catch(TextException &e) {
+        fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+    } catch(...) {
+        fatal("FATAL: error while accepting new client connection: [unkown]\n");
     }
-    commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
 }
 
 bool
-Comm::ListenStateData::okToAccept()
+Comm::ConnAcceptor::okToAccept()
 {
     static time_t last_warn = 0;
 
@@ -139,7 +213,7 @@
 }
 
 void
-Comm::ListenStateData::acceptOne()
+Comm::ConnAcceptor::acceptOne()
 {
     /*
      * We don't worry about running low on FDs here.  Instead,
@@ -148,42 +222,43 @@
      */
 
     /* Accept a new connection */
-    ConnectionDetail connDetails;
-    int newfd = oldAccept(connDetails);
+    ConnectionPointer newConnDetails = new Connection();
+    comm_err_t status = oldAccept(newConnDetails);
 
     /* Check for errors */
-    if (newfd < 0) {
+    if (!newConnDetails->isOpen()) {
 
-        if (newfd == COMM_NOMESSAGE) {
+        if (status == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
-            commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+            debugs(5, 5, HERE << "try later: " << conn << " handler Subscription: " << theCallSub);
+            commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
-        notify(-1, COMM_ERROR, connDetails);
-        mayAcceptMore = false;
+        debugs(5, 5, HERE << "non-recoverable error: " << conn << " handler Subscription: " << theCallSub);
+        notify(status, newConnDetails);
+        mustStop("Listener socket closed");
         return;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails.peer <<
-           " handler: " << theCallback);
-    notify(newfd, COMM_OK, connDetails);
+    debugs(5, 5, HERE << "Listener: " << conn <<
+           " accepted new connection " << newConnDetails <<
+           " handler Subscription: " << theCallSub);
+    notify(status, newConnDetails);
 }
 
 void
-Comm::ListenStateData::acceptNext()
+Comm::ConnAcceptor::acceptNext()
 {
-    assert(isOpen(fd));
-    debugs(5, 2, HERE << "connection on FD " << fd);
+    Must(IsConnOpen(conn));
+    debugs(5, 2, HERE << "connection on " << conn);
     acceptOne();
 }
 
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
 void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::ConnAcceptor::notify(comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
 {
     // listener socket handlers just abandon the port with COMM_ERR_CLOSING
     // it should only happen when this object is deleted...
@@ -191,89 +266,95 @@
         return;
     }
 
-    if (theCallback != NULL) {
-        typedef CommAcceptCbParams Params;
-        Params &params = GetCommParams<Params>(theCallback);
-        params.fd = fd;
-        params.nfd = newfd;
-        params.details = connDetails;
+    if (theCallSub != NULL) {
+        AsyncCall::Pointer call = theCallSub->callback();
+        CommAcceptCbParams &params = GetCommParams<CommAcceptCbParams>(call);
+        params.fd = conn->fd;
+        params.conn = newConnDetails;
         params.flag = flag;
         params.xerrno = errcode;
-        ScheduleCallHere(theCallback);
-        if (!mayAcceptMore)
-            theCallback = NULL;
+        ScheduleCallHere(call);
     }
 }
 
 /**
  * accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK         success. details parameter filled.
+ * \retval COMM_NOMESSAGE  attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR      an outright failure occured.
+ *                         Or if this client has too many connections already.
  */
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::ConnAcceptor::oldAccept(Comm::ConnectionPointer &details)
 {
     PROF_start(comm_accept);
     statCounter.syscalls.sock.accepts++;
     int sock;
     struct addrinfo *gai = NULL;
-    details.me.InitAddrInfo(gai);
+    details->local.InitAddrInfo(gai);
 
     errcode = 0; // reset local errno copy.
-    if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+    if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
         errcode = errno; // store last accept errno locally.
 
-        details.me.FreeAddrInfo(gai);
+        details->local.FreeAddrInfo(gai);
 
         PROF_stop(comm_accept);
 
         if (ignoreErrno(errno)) {
-            debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 5, HERE << conn << ": " << xstrerror());
             return COMM_NOMESSAGE;
         } else if (ENFILE == errno || EMFILE == errno) {
-            debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 3, HERE << conn << ": " << xstrerror());
             return COMM_ERROR;
         } else {
-            debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 1, HERE << conn << ": " << xstrerror());
             return COMM_ERROR;
         }
     }
 
-    details.peer = *gai;
+    Must(sock >= 0);
+    details->fd = sock;
+    details->remote = *gai;
 
     if ( Config.client_ip_max_connections >= 0) {
-        if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) {
-            debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections.");
-            details.me.FreeAddrInfo(gai);
+        if (clientdbEstablished(details->remote, 0) > Config.client_ip_max_connections) {
+            debugs(50, DBG_IMPORTANT, "WARNING: " << details->remote << " attempting more than " << Config.client_ip_max_connections << " connections.");
+            details->local.FreeAddrInfo(gai);
             return COMM_ERROR;
         }
     }
 
-    details.me.InitAddrInfo(gai);
-
-    details.me.SetEmpty();
+    // lookup the local-end details of this new connection
+    details->local.InitAddrInfo(gai);
+    details->local.SetEmpty();
     getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
-    details.me = *gai;
-
-    commSetCloseOnExec(sock);
+    details->local = *gai;
+    details->local.FreeAddrInfo(gai);
 
     /* fdstat update */
+    // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+    // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
     fd_open(sock, FD_SOCKET, "HTTP Request");
 
     fdd_table[sock].close_file = NULL;
     fdd_table[sock].close_line = 0;
 
     fde *F = &fd_table[sock];
-    details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
-    F->remote_port = details.peer.GetPort();
-    F->local_addr.SetPort(details.me.GetPort());
-    F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
-    details.me.FreeAddrInfo(gai);
+    details->remote.NtoA(F->ipaddr,MAX_IPSTRLEN);
+    F->remote_port = details->remote.GetPort();
+    F->local_addr = details->local;
+    F->sock_family = details->local.IsIPv6()?AF_INET6:AF_INET;
 
+    // set socket flags
+    commSetCloseOnExec(sock);
     commSetNonBlocking(sock);
 
     /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
-    F->flags.transparent = fd_table[fd].flags.transparent;
+    F->flags.transparent = fd_table[conn->fd].flags.transparent;
 
     PROF_stop(comm_accept);
-    return sock;
+    return COMM_OK;
 }

=== renamed file 'src/comm/ListenStateData.h' => 'src/comm/ConnAcceptor.h'
--- src/comm/ListenStateData.h	2010-08-09 10:48:17 +0000
+++ src/comm/ConnAcceptor.h	2010-09-10 11:31:04 +0000
@@ -1,40 +1,77 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_CONNACCEPTOR_H
+#define SQUID_COMM_CONNACCEPTOR_H
 
 #include "config.h"
-#include "base/AsyncCall.h"
-#include "comm.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm/comm_err_t.h"
+#include "comm/forward.h"
+
 #if HAVE_MAP
 #include <map>
 #endif
 
-class ConnectionDetail;
-
 namespace Comm
 {
 
-class ListenStateData
+class AcceptLimiter;
+
+/**
+ * Listens on a Comm::Connection for new incoming connections and
+ * emits an active Comm::Connection descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class ConnAcceptor : public AsyncJob
 {
+private:
+    virtual void start();
+    virtual bool doneAll() const;
+    virtual void swanSong();
 
 public:
-    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
-    ListenStateData(const ListenStateData &r); // not implemented.
-    ~ListenStateData();
-
-    void subscribe(AsyncCall::Pointer &call);
+    ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub);
+    ConnAcceptor(const ConnAcceptor &r); // not implemented.
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Replaces any existing subscribed handler.
+     */
+    void subscribe(const Subscription::Pointer &aSub);
+
+    /** Remove the currently waiting callback subscription.
+     * Pending calls will remain scheduled.
+     */
+    void unsubscribe(const char *reason);
+
+    /** Try and accept another connection (synchronous).
+     * If one is pending already the subscribed callback handler will be scheduled
+     * to handle it before this method returns.
+     */
     void acceptNext();
-    void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
 
-    int fd;
+    /// Call the subscribed callback handler with details about a new connection.
+    void notify(comm_err_t flag, const Comm::ConnectionPointer &details);
 
     /// errno code of the last accept() or listen() action if one occurred.
     int errcode;
 
-    /// whether this socket is delayed and on the AcceptLimiter queue.
-    int32_t isLimited;
-
-private:
-    /// Method to test if there are enough file escriptors to open a new client connection
+private:
+    friend class AcceptLimiter;
+    int32_t isLimited;                   ///< whether this socket is delayed and on the AcceptLimiter queue.
+    Subscription::Pointer theCallSub;    ///< used to generate AsyncCalls handling our events.
+
+    /// conn being listened on for new connections
+    /// Reserved for read-only use.
+    ConnectionPointer conn;
+
+private:
+    /// Method to test if there are enough file descriptors to open a new client connection
     /// if not the accept() will be postponed
     static bool okToAccept();
 
@@ -42,14 +79,12 @@
     static void doAccept(int fd, void *data);
 
     void acceptOne();
-    int oldAccept(ConnectionDetail &details);
-
-    AsyncCall::Pointer theCallback;
-    bool mayAcceptMore;
-
+    comm_err_t oldAccept(Comm::ConnectionPointer &details);
     void setListen();
+
+    CBDATA_CLASS2(ConnAcceptor);
 };
 
 }; // namespace Comm
 
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_CONNACCEPTOR_H */

=== added file 'src/comm/ConnOpener.cc'
--- src/comm/ConnOpener.cc	1970-01-01 00:00:00 +0000
+++ src/comm/ConnOpener.cc	2010-09-05 04:12:56 +0000
@@ -0,0 +1,290 @@
+/*
+ * DEBUG: section 05    Socket Connection Opener
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "comm/ConnOpener.h"
+#include "comm/Connection.h"
+#include "comm.h"
+#include "fde.h"
+#include "icmp/net_db.h"
+#include "SquidTime.h"
+
+namespace Comm {
+    CBDATA_CLASS_INIT(ConnOpener);
+};
+
+Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) :
+        AsyncJob("Comm::ConnOpener"),
+        host_(NULL),
+        conn_(c),
+        callback_(handler),
+        totalTries_(0),
+        failRetries_(0),
+        connectTimeout_(ctimeout),
+        connStart_(0)
+{}
+
+Comm::ConnOpener::~ConnOpener()
+{
+    safe_free(host_);
+}
+
+bool
+Comm::ConnOpener::doneAll() const
+{
+    // is the conn_ to be opened still waiting?
+    if (conn_ != NULL) {
+        return false;
+    }
+
+    // is the callback still to be called?
+    if (callback_ != NULL) {
+        return false;
+    }
+
+    return AsyncJob::doneAll();
+}
+
+void
+Comm::ConnOpener::swanSong()
+{
+    // cancel any event watchers
+    // done here to get the "swanSong" mention in cancel debugging.
+    if (calls_.earlyAbort_ != NULL) {
+        calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
+        calls_.earlyAbort_ = NULL;
+    }
+    if (calls_.timeout_ != NULL) {
+        calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
+        calls_.timeout_ = NULL;
+    }
+
+    // recover what we can from the job
+    if (conn_ != NULL && conn_->isOpen()) {
+        // it never reached fully open, so abort the FD
+        conn_->close();
+        fd_table[conn_->fd].flags.open = 0;
+        // inform the caller
+        doneConnecting(COMM_ERR_CONNECT, 0);
+    }
+
+    AsyncJob::swanSong();
+}
+
+void
+Comm::ConnOpener::setHost(const char * new_host)
+{
+    // unset and erase if already set.
+    if (host_ != NULL)
+        safe_free(host_);
+
+    // set the new one if given.
+    if (new_host != NULL)
+        host_ = xstrdup(new_host);
+}
+
+const char *
+Comm::ConnOpener::getHost() const
+{
+    return host_;
+}
+
+/**
+ * Connection attempt are completed. One way or the other.
+ * Pass the results back to the external handler.
+ */
+void
+Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
+{
+    if (callback_ != NULL) {
+        typedef CommConnectCbParams Params;
+        Params &params = GetCommParams<Params>(callback_);
+        params.conn = conn_;
+        params.flag = status;
+        params.xerrno = xerrno;
+        ScheduleCallHere(callback_);
+        callback_ = NULL;
+    }
+
+    /* ensure cleared local state, we are done. */
+    conn_ = NULL;
+}
+
+void
+Comm::ConnOpener::start()
+{
+    Must(conn_ != NULL);
+
+    /* get a socket open ready for connecting with */
+    if (!conn_->isOpen()) {
+#if USE_IPV6
+        /* outbound sockets have no need to be protocol agnostic. */
+        if (conn_->remote.IsIPv4()) {
+            conn_->local.SetIPv4();
+        }
+#endif
+        conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, host_);
+        if (!conn_->isOpen()) {
+            doneConnecting(COMM_ERR_CONNECT, 0);
+            return;
+        }
+
+        if (calls_.earlyAbort_ == NULL) {
+            typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+            calls_.earlyAbort_ = asyncCall(5, 4, "Comm::ConnOpener::earlyAbort",
+                                         Dialer(this, &Comm::ConnOpener::earlyAbort));
+            comm_add_close_handler(conn_->fd, calls_.earlyAbort_);
+        }
+
+        if (calls_.timeout_ == NULL) {
+            typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> Dialer;
+            calls_.timeout_ = asyncCall(5, 4, "Comm::ConnOpener::timeout",
+                                      Dialer(this, &Comm::ConnOpener::timeout));
+            debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+            commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_);
+        }
+
+        if (connStart_ == 0) {
+            connStart_ = squid_curtime;
+        }
+    }
+
+    typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+    calls_.connect_ = asyncCall(5, 4, "Comm::ConnOpener::connect",
+                                Dialer(this, &Comm::ConnOpener::connect));
+    ScheduleCallHere(calls_.connect_);
+}
+
+/** Make an FD connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::connect(const CommConnectCbParams &unused)
+{
+    Must(conn_ != NULL);
+
+    totalTries_++;
+
+    switch (comm_connect_addr(conn_->fd, conn_->remote) ) {
+
+    case COMM_INPROGRESS:
+        // check for timeout FIRST.
+        if(squid_curtime - connStart_ > connectTimeout_) {
+            debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
+            doneConnecting(COMM_TIMEOUT, errno);
+            return;
+        } else {
+            debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+            commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::ConnectRetry, this, 0);
+        }
+        break;
+
+    case COMM_OK:
+        debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
+
+        /*
+         * stats.conn_open is used to account for the number of
+         * connections that we have open to the peer, so we can limit
+         * based on the max-conn option.  We need to increment here,
+         * even if the connection may fail.
+         */
+        if (conn_->getPeer())
+            conn_->getPeer()->stats.conn_open++;
+
+        lookupLocalAddress();
+
+        /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to
+         *       indicate the state of a raw fd object being passed around.
+         *       Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
+         *       when those are done comm_local_port can become one of our member functions to do the below.
+         */
+        fd_table[conn_->fd].flags.open = 1;
+        fd_table[conn_->fd].local_addr = conn_->local;
+
+        if (host_ != NULL)
+            ipcacheMarkGoodAddr(host_, conn_->remote);
+        doneConnecting(COMM_OK, 0);
+        break;
+
+    default:
+        debugs(5, 5, HERE << conn_ << ": * - try again");
+        failRetries_++;
+        if (host_ != NULL)
+            ipcacheMarkBadAddr(host_, conn_->remote);
+#if USE_ICMP
+        if (Config.onoff.test_reachability)
+            netdbDeleteAddrNetwork(conn_->remote);
+#endif
+
+        // check for timeout FIRST.
+        if(squid_curtime - connStart_ > connectTimeout_) {
+            debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
+            doneConnecting(COMM_TIMEOUT, errno);
+        } else if (failRetries_ < Config.connect_retries) {
+            ScheduleCallHere(calls_.connect_);
+        } else {
+            // send ERROR back to the upper layer.
+            debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
+            doneConnecting(COMM_ERR_CONNECT, errno);
+        }
+    }
+}
+
+/**
+ * Lookup local-end address and port of the TCP link just opened.
+ * This ensure the connection local details are set correctly
+ */
+void
+Comm::ConnOpener::lookupLocalAddress()
+{
+    struct addrinfo *addr = NULL;
+    conn_->local.InitAddrInfo(addr);
+
+    if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
+        debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
+        conn_->local.FreeAddrInfo(addr);
+        return;
+    }
+
+    conn_->local = *addr;
+    conn_->local.FreeAddrInfo(addr);
+    debugs(5, 6, HERE << conn_);
+}
+
+/** Abort connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io)
+{
+    debugs(5, 3, HERE << io.conn);
+    doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+}
+
+/**
+ * Handles the case(s) when a partially setup connection gets timed out.
+ * NP: When commSetTimeout accepts generic CommCommonCbParams this can die.
+ */
+void
+Comm::ConnOpener::timeout(const CommTimeoutCbParams &unused)
+{
+    ScheduleCallHere(calls_.connect_);
+}
+
+/* Legacy Wrapper for the retry event after COMM_INPROGRESS
+ * TODO: As soon as comm IO accepts Async calls we can use a ConnOpener::connect call
+ */
+void
+Comm::ConnOpener::ConnectRetry(int fd, void *data)
+{
+    ConnOpener *cs = static_cast<Comm::ConnOpener *>(data);
+
+    // Ew. we are now outside the all AsyncJob protections.
+    // get back inside by scheduling another call...
+    typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(5, 4, "Comm::ConnOpener::connect",
+                                        Dialer(cs, &Comm::ConnOpener::connect));
+    ScheduleCallHere(call);
+}

=== added file 'src/comm/ConnOpener.h'
--- src/comm/ConnOpener.h	1970-01-01 00:00:00 +0000
+++ src/comm/ConnOpener.h	2010-09-06 12:06:44 +0000
@@ -0,0 +1,72 @@
+#ifndef _SQUID_SRC_COMM_OPENERSTATEDATA_H
+#define _SQUID_SRC_COMM_OPENERSTATEDATA_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJob.h"
+#include "cbdata.h"
+#include "CommCalls.h"
+#include "comm/comm_err_t.h"
+#include "comm/forward.h"
+
+namespace Comm {
+
+/**
+ * Async-opener of a Comm connection.
+ */
+class ConnOpener : public AsyncJob
+{
+protected:
+    virtual void start();
+    virtual void swanSong();
+
+public:
+    virtual bool doneAll() const;
+
+    ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout);
+    ~ConnOpener();
+
+    void setHost(const char *);    ///< set the hostname note for this connection
+    const char * getHost() const;  ///< get the hostname noted for this connection
+
+private:
+    // Undefined because two openers cannot share a connection
+    ConnOpener(const ConnOpener &);
+    ConnOpener & operator =(const ConnOpener &c);
+
+    void connect(const CommConnectCbParams &unused);
+    void earlyAbort(const CommConnectCbParams &);
+    void timeout(const CommTimeoutCbParams &unused);
+    void doneConnecting(comm_err_t status, int xerrno);
+    static void ConnectRetry(int fd, void *data);
+    void lookupLocalAddress();
+
+private:
+    char *host_;                         ///< domain name we are trying to connect to.
+    Comm::ConnectionPointer conn_;       ///< single connection currently being opened.
+    AsyncCall::Pointer callback_;        ///< handler to be called on connection completion.
+
+    int totalTries_;   ///< total number of connection attempts over all destinations so far.
+    int failRetries_;  ///< number of retries current destination has been tried.
+
+    /**
+     * time at which to abandon the connection.
+     * the connection-done callback will be passed COMM_TIMEOUT
+     */
+    time_t connectTimeout_;
+
+    /// time at which this series of connection attempts was started.
+    time_t connStart_;
+
+    /// handles to calls which we may need to cancel.
+    struct Calls {
+        AsyncCall::Pointer connect_;
+        AsyncCall::Pointer earlyAbort_;
+        AsyncCall::Pointer timeout_;
+    } calls_;
+
+    CBDATA_CLASS2(ConnOpener);
+};
+
+}; // namespace Comm
+
+#endif /* _SQUID_SRC_COMM_CONNOPENER_H */

=== added file 'src/comm/Connection.cc'
--- src/comm/Connection.cc	1970-01-01 00:00:00 +0000
+++ src/comm/Connection.cc	2010-08-13 09:20:32 +0000
@@ -0,0 +1,77 @@
+#include "config.h"
+#include "cbdata.h"
+#include "comm.h"
+#include "comm/Connection.h"
+
+bool
+Comm::IsConnOpen(const Comm::ConnectionPointer &conn)
+{
+    return conn != NULL && conn->isOpen();
+}
+
+
+Comm::Connection::Connection() :
+        local(),
+        remote(),
+        peerType(HIER_NONE),
+        fd(-1),
+        tos(0),
+        flags(COMM_NONBLOCKING),
+        _peer(NULL)
+{}
+
+Comm::Connection::~Connection()
+{
+    close();
+    cbdataReferenceDone(_peer);
+}
+
+Comm::ConnectionPointer
+Comm::Connection::copyDetails() const
+{
+    ConnectionPointer c = new Comm::Connection;
+
+    c->local = local;
+    c->remote = remote;
+    c->peerType = peerType;
+    c->tos = tos;
+    c->flags = flags;
+ 
+    // ensure FD is not open in the new copy.
+    c->fd = -1;
+
+    // ensure we have a cbdata reference to _peer not a straight ptr copy.
+    c->_peer = cbdataReference(_peer);
+
+    return c;
+}
+
+void
+Comm::Connection::close()
+{
+    if (isOpen()) {
+        comm_close(fd);
+        fd = -1;
+        if (_peer)
+            _peer->stats.conn_open--;
+    }
+}
+
+void
+Comm::Connection::setPeer(peer *p)
+{
+    /* set to self. nothing to do. */
+    if (_peer == p)
+        return;
+
+    /* clear any previous ptr */
+    if (_peer) {
+        cbdataReferenceDone(_peer);
+        _peer = NULL;
+    }
+
+    /* set the new one (unless it is NULL */
+    if (p) {
+        _peer = cbdataReference(p);
+    }
+}

=== renamed file 'src/ConnectionDetail.h' => 'src/comm/Connection.h'
--- src/ConnectionDetail.h	2010-05-26 03:06:02 +0000
+++ src/comm/Connection.h	2010-08-17 07:45:20 +0000
@@ -1,5 +1,6 @@
 /*
  * DEBUG: section 05    Socket Functions
+ * AUTHOR: Amos Jeffries
  * AUTHOR: Robert Collins
  *
  * SQUID Web Proxy Cache          http://www.squid-cache.org/
@@ -30,23 +31,138 @@
  *
  *
  * Copyright (c) 2003, Robert Collins <[email protected]>
+ * Copyright (c) 2010, Amos Jeffries <[email protected]>
  */
 
 #ifndef _SQUIDCONNECTIONDETAIL_H_
 #define _SQUIDCONNECTIONDETAIL_H_
 
+#include "config.h"
+#include "comm/forward.h"
+#include "hier_code.h"
 #include "ip/Address.h"
-
-class ConnectionDetail
+#include "RefCount.h"
+
+#if HAVE_IOSFWD
+#include <iosfwd>
+#endif
+#if HAVE_OSTREAM
+#include <ostream>
+#endif
+
+struct peer;
+
+namespace Comm {
+
+/* TODO: make these a struct of boolean flags members in the connection instead of a bitmap.
+ * we can't do that until all non-comm code uses Commm::Connection objects to create FD
+ * currently there is code still using comm_open() and comm_openex() synchronously!!
+ */
+#define COMM_UNSET              0x00
+#define COMM_NONBLOCKING        0x01
+#define COMM_NOCLOEXEC          0x02
+#define COMM_REUSEADDR          0x04
+#define COMM_TRANSPARENT        0x08
+#define COMM_DOBIND             0x10
+
+/**
+ * Store data about the physical and logical attributes of a connection.
+ *
+ * Some link state can be infered from the data, however this is not an
+ * object for state data. But a semantic equivalent for FD with easily
+ * accessible cached properties not requiring repeated complex lookups.
+ *
+ * While the properties may be changed, this is for teh purpose of creating
+ * potential connection descriptors which may be opened. Properties should
+ * be considered read-only outside of the Comm layer code once the connection
+ * is open.
+ *
+ * These objects must not be passed around directly,
+ * but a Comm::ConnectionPointer must be passed instead.
+ */
+class Connection : public RefCountable
 {
-
-public:
-
-    ConnectionDetail();
-
-    Ip::Address me;
-
-    Ip::Address peer;
+public:
+    /** standard empty connection creation */
+    Connection();
+
+    /** Clear the connection properties and close any open socket. */
+    ~Connection();
+
+    /** Copy an existing connections IP and properties.
+     * This excludes the FD. The new copy will be a closed connection.
+     */
+    ConnectionPointer copyDetails() const;
+
+    /** Close any open socket. */
+    void close();
+
+    /** determine whether this object describes an active connection or not. */
+    bool isOpen() const { return (fd >= 0); }
+
+    /** retrieve the peer pointer for use.
+     * The caller is responsible for all CBDATA operations regarding the
+     * used of the pointer returned.
+     */
+    peer * const getPeer() const { return _peer; }
+
+    /** alter the stored peer pointer.
+     * Perform appropriate CBDATA operations for locking the peer pointer
+     */
+    void setPeer(peer * p);
+
+private:
+    /** These objects may not be exactly duplicated. Use copyDetails() instead. */
+    Connection(const Connection &c);
+
+    /** These objects may not be exactly duplicated. Use copyDetails() instead. */
+    Connection & operator =(const Connection &c);
+
+public:
+    /** Address/Port for the Squid end of a TCP link. */
+    Ip::Address local;
+
+    /** Address for the Remote end of a TCP link. */
+    Ip::Address remote;
+
+    /** Hierarchy code for this connection link */
+    hier_code peerType;
+
+    /** Socket used by this connection. -1 if no socket has been opened. */
+    int fd;
+
+    /** Quality of Service TOS values currently sent on this connection */
+    int tos;
+
+    /** COMM flags set on this connection */
+    int flags;
+
+private:
+    /** cache_peer data object (if any) */
+    peer *_peer;
 };
 
+}; // namespace Comm
+
+
+// NP: Order and namespace here is very important.
+//     * The second define inlines the first.
+//     * Stream inheritance overloading is searched in the global scope first.
+
+inline std::ostream &
+operator << (std::ostream &os, const Comm::Connection &conn)
+{
+    os << "FD " << conn.fd << " local=" << conn.local <<
+        " remote=" << conn.remote << " flags=" << conn.flags;
+    return os;
+}
+
+inline std::ostream &
+operator << (std::ostream &os, const Comm::ConnectionPointer &conn)
+{
+    if (conn != NULL)
+        os << *conn;
+    return os;
+}
+
 #endif

=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am	2009-12-31 02:35:01 +0000
+++ src/comm/Makefile.am	2010-08-15 07:52:51 +0000
@@ -1,13 +1,22 @@
 include $(top_srcdir)/src/Common.am
 include $(top_srcdir)/src/TestHeaders.am
 
-noinst_LTLIBRARIES = libcomm-listener.la
+noinst_LTLIBRARIES = libcomm.la
 
-## Library holding listener comm socket handlers
-libcomm_listener_la_SOURCES= \
+## First group are listener comm socket handlers
+## Second group are outbound connection setup handlers
+## Third group are misc shared comm objects
+libcomm_la_SOURCES= \
 	AcceptLimiter.cc \
 	AcceptLimiter.h \
-	ListenStateData.cc \
-	ListenStateData.h \
-	\
-	comm_internal.h
+	ConnAcceptor.cc \
+	ConnAcceptor.h \
+	\
+	ConnOpener.cc \
+	ConnOpener.h \
+	\
+	Connection.cc \
+	Connection.h \
+	comm_err_t.h \
+	comm_internal.h \
+	forward.h

=== added file 'src/comm/comm_err_t.h'
--- src/comm/comm_err_t.h	1970-01-01 00:00:00 +0000
+++ src/comm/comm_err_t.h	2010-05-15 14:57:24 +0000
@@ -0,0 +1,21 @@
+#ifndef _SQUID_COMM_COMM_ERR_T_H
+#define _SQUID_COMM_COMM_ERR_T_H
+
+#include "config.h"
+
+typedef enum {
+    COMM_OK = 0,
+    COMM_ERROR = -1,
+    COMM_NOMESSAGE = -3,
+    COMM_TIMEOUT = -4,
+    COMM_SHUTDOWN = -5,
+    COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
+    COMM_INPROGRESS = -7,
+    COMM_ERR_CONNECT = -8,
+    COMM_ERR_DNS = -9,
+    COMM_ERR_CLOSING = -10,
+    COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+    COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
+} comm_err_t;
+
+#endif /* _SQUID_COMM_COMM_ERR_T_H */

=== added file 'src/comm/forward.h'
--- src/comm/forward.h	1970-01-01 00:00:00 +0000
+++ src/comm/forward.h	2010-07-24 05:23:58 +0000
@@ -0,0 +1,19 @@
+#ifndef _SQUID_COMM_FORWARD_H
+#define _SQUID_COMM_FORWARD_H
+
+#include "Array.h"
+#include "RefCount.h"
+
+namespace Comm {
+
+class Connection;
+
+typedef RefCount<Comm::Connection> ConnectionPointer;
+
+typedef Vector<Comm::ConnectionPointer> ConnectionList;
+
+bool IsConnOpen(const Comm::ConnectionPointer &conn);
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_FORWARD_H */

Reply via email to