Two patches:

patch #1:
This builds on the changes already audited in src/comm and src/base to implement TCP listening ports being opened with ConnAcceptor and a Subscription (to emit the HTTP or HTTPS client connect calls).

Alex; can you check that I have understood the IPC systems correctly?

NP: There is now a possibility to implementing the UDP receivers (SNMP, DNS, ICP, multicast ICP, HTCP) in the event subscription model as well instead of the reader model. I have not done that though.

Amos
--
Please be using
  Current Stable Squid 2.7.STABLE9 or 3.1.8
  Beta testers wanted for 3.2.0.2
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/Coordinator.cc	2010-09-24 09:29:17 +0000
@@ -7,7 +7,9 @@
 
 
 #include "config.h"
+#include "base/Subscription.h"
 #include "comm.h"
+#include "comm/Connection.h"
 #include "ipc/Coordinator.h"
 #include "ipc/FdNotes.h"
 #include "ipc/SharedListen.h"
@@ -81,20 +83,20 @@
            " needs shared listen FD for " << request.params.addr);
     Listeners::const_iterator i = listeners.find(request.params);
     int errNo = 0;
-    const int sock = (i != listeners.end()) ?
+    const Comm::ConnectionPointer c = (i != listeners.end()) ?
                      i->second : openListenSocket(request, errNo);
 
-    debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+    debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
            request.params.addr << " to kid" << request.requestorId <<
            " mapId=" << request.mapId);
 
-    SharedListenResponse response(sock, errNo, request.mapId);
+    SharedListenResponse response(c, errNo, request.mapId);
     TypedMsgHdr message;
     response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
 }
 
-int
+Comm::ConnectionPointer
 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
                                    int &errNo)
 {
@@ -103,19 +105,23 @@
     debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
            request.requestorId);
 
-    Ip::Address addr = p.addr; // comm_open_listener may modify it
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    conn->local = p.addr; // comm_open_listener may modify it
+    conn->flags = p.flags;
 
     enter_suid();
-    const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
-                                        FdNote(p.fdNote));
-    errNo = (sock >= 0) ? 0 : errno;
+    comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+    errNo = Comm::IsConnOpen(conn) ? 0 : errno;
     leave_suid();
 
+    debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+           request.requestorId);
+
     // cache positive results
-    if (sock >= 0)
-        listeners[request.params] = sock;
+    if (Comm::IsConnOpen(conn))
+        listeners[request.params] = conn;
 
-    return sock;
+    return conn;
 }
 
 void Ipc::Coordinator::broadcastSignal(int sig) const

=== modified file 'src/ipc/Coordinator.h'
--- src/ipc/Coordinator.h	2010-07-05 21:28:23 +0000
+++ src/ipc/Coordinator.h	2010-09-24 07:09:11 +0000
@@ -41,13 +41,13 @@
     void handleSharedListenRequest(const SharedListenRequest& request);
 
     /// calls comm_open_listener()
-    int openListenSocket(const SharedListenRequest& request, int &errNo);
+    Comm::ConnectionPointer openListenSocket(const SharedListenRequest& request, int &errNo);
 
 private:
     typedef Vector<StrandCoord> Strands; ///< unsorted strands
     Strands strands; ///< registered processes and threads
 
-    typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
+    typedef std::map<OpenListenerParams, Comm::ConnectionPointer> Listeners; ///< params:fd map
     Listeners listeners; ///< cached comm_open_listener() results
 
     static Coordinator* TheInstance; ///< the only class instance in existence

=== modified file 'src/ipc/Port.cc'
--- src/ipc/Port.cc	2010-08-24 00:12:54 +0000
+++ src/ipc/Port.cc	2010-10-02 10:16:26 +0000
@@ -5,9 +5,10 @@
  *
  */
 
-
 #include "config.h"
+#include "comm.h"
 #include "CommCalls.h"
+#include "comm/Connection.h"
 #include "ipc/Port.h"
 
 const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
@@ -33,7 +34,7 @@
     typedef CommCbMemFunT<Port, CommIoCbParams> Dialer;
     AsyncCall::Pointer readHandler = JobCallback(54, 6,
                                      Dialer, this, Port::noteRead);
-    comm_read(fd(), buf.raw(), buf.size(), readHandler);
+    comm_read(conn(), buf.raw(), buf.size(), readHandler);
 }
 
 bool Ipc::Port::doneAll() const
@@ -53,7 +54,7 @@
 
 void Ipc::Port::noteRead(const CommIoCbParams& params)
 {
-    debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag <<
+    debugs(54, 6, HERE << params.conn << " flag " << params.flag <<
            " [" << this << ']');
     if (params.flag == COMM_OK) {
         assert(params.buf == buf.raw());

=== modified file 'src/ipc/SharedListen.cc'
--- src/ipc/SharedListen.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.cc	2010-09-24 09:28:25 +0000
@@ -6,9 +6,9 @@
  */
 
 #include "config.h"
-#include <map>
+#include "base/TextException.h"
 #include "comm.h"
-#include "base/TextException.h"
+#include "comm/Connection.h"
 #include "ipc/Port.h"
 #include "ipc/Messages.h"
 #include "ipc/Kids.h"
@@ -16,6 +16,7 @@
 #include "ipc/StartListening.h"
 #include "ipc/SharedListen.h"
 
+#include <map>
 
 /// holds information necessary to handle JoinListen response
 class PendingOpenRequest
@@ -80,22 +81,24 @@
 }
 
 
-Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
-        fd(aFd), errNo(anErrNo), mapId(aMapId)
+Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer &c, int anErrNo, int aMapId):
+        conn(c), errNo(anErrNo), mapId(aMapId)
 {
 }
 
 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
-        fd(-1), errNo(0), mapId(-1)
+        conn(NULL), errNo(0), mapId(-1)
 {
     hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
-    fd = hdrMsg.getFd();
+    conn = new Comm::Connection;
+    conn->fd = hdrMsg.getFd();
+    // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
 }
 
 void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
-    hdrMsg.putFd(fd);
+    hdrMsg.putFd(conn->fd);
 }
 
 
@@ -121,9 +124,10 @@
 
 void Ipc::SharedListenJoined(const SharedListenResponse &response)
 {
-    const int fd = response.fd;
+    Comm::ConnectionPointer c = response.conn;
 
-    debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+    // Dont debugs c fully since only FD is filled right now.
+    debugs(54, 3, HERE << "got listening FD " << c->fd << " errNo=" <<
            response.errNo << " mapId=" << response.mapId);
 
     Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
@@ -131,20 +135,23 @@
     Must(por.callback != NULL);
     TheSharedListenRequestMap.erase(response.mapId);
 
-    if (fd >= 0) {
+    if (Comm::IsConnOpen(c)) {
         OpenListenerParams &p = por.params;
+        c->local = p.addr;
+        c->flags = p.flags;
+        // XXX: leave the comm AI stuff to comm_import_opened()?
         struct addrinfo *AI = NULL;
         p.addr.GetAddrInfo(AI);
         AI->ai_socktype = p.sock_type;
         AI->ai_protocol = p.proto;
-        comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+        comm_import_opened(c, FdNote(p.fdNote), AI);
         p.addr.FreeAddrInfo(AI);
     }
 
-    StartListeningCb *cbd =
-        dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+    StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
     Must(cbd);
-    cbd->fd = fd;
+    cbd->conn = c;
     cbd->errNo = response.errNo;
+    cbd->handlerSubscription = por.params.handlerSubscription;
     ScheduleCallHere(por.callback);
 }

=== modified file 'src/ipc/SharedListen.h'
--- src/ipc/SharedListen.h	2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.h	2010-09-24 09:23:04 +0000
@@ -15,7 +15,8 @@
 
 /// "shared listen" is when concurrent processes are listening on the same fd
 
-/// comm_open_listener() parameters holder
+/// Comm::ConnAcceptor parameters holder
+/// all the details necessary to recreate a Comm::Connection and fde entry for the kid listener FD
 class OpenListenerParams
 {
 public:
@@ -23,11 +24,17 @@
 
     bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
 
+    // bits to re-create the fde entry
     int sock_type;
     int proto;
+    int fdNote; ///< index into fd_note() comment strings
+
+    // bits to re-create the listener Comm::Connection descriptor
     Ip::Address addr; ///< will be memset and memcopied
     int flags;
-    int fdNote; ///< index into fd_note() comment strings
+
+    /// handler to subscribe to Comm::ConnAcceptor when we get the response
+    Subscription::Pointer handlerSubscription;
 };
 
 class TypedMsgHdr;
@@ -52,12 +59,12 @@
 class SharedListenResponse
 {
 public:
-    SharedListenResponse(int fd, int errNo, int mapId);
+    SharedListenResponse(const Comm::ConnectionPointer &c, int errNo, int mapId);
     explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
     void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
-    int fd; ///< opened listening socket or -1
+    Comm::ConnectionPointer conn; ///< opened listening socket or -1
     int errNo; ///< errno value from comm_open_sharedListen() call
     int mapId; ///< to map future response to the requestor's callback
 };

=== modified file 'src/ipc/StartListening.cc'
--- src/ipc/StartListening.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/StartListening.cc	2010-10-02 02:23:20 +0000
@@ -6,13 +6,16 @@
  */
 
 #include "config.h"
+#include "base/Subscription.h"
+#include "base/TextException.h"
 #include "comm.h"
-#include "base/TextException.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
 #include "ipc/SharedListen.h"
 #include "ipc/StartListening.h"
 
 
-Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+Ipc::StartListeningCb::StartListeningCb(): conn(NULL), errNo(0)
 {
 }
 
@@ -22,37 +25,44 @@
 
 std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
 {
-    return os << "(FD " << fd << ", err=" << errNo;
+    return os << "(" << conn << ", err=" << errNo;
 }
 
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
-                         int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+                    FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
 {
-    OpenListenerParams p;
-    p.sock_type = sock_type;
-    p.proto = proto;
-    p.addr = addr;
-    p.flags = flags;
-    p.fdNote = fdNote;
-
     if (UsingSmp()) { // if SMP is on, share
+        OpenListenerParams p;
+        p.sock_type = sock_type;
+        p.proto = proto;
+        p.addr = listenConn->local;
+        p.flags = listenConn->flags;
+        p.fdNote = fdNote;
+        p.handlerSubscription = sub;
+
         Ipc::JoinSharedListen(p, callback);
         return; // wait for the call back
     }
 
+    StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+    Must(cbd);
+    cbd->conn = listenConn;
+
     enter_suid();
-    const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
-                                        FdNote(p.fdNote));
-    const int errNo = (sock >= 0) ? 0 : errno;
+    if (sock_type == SOCK_STREAM) {
+        // TCP: setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+        AsyncJob::Start(new Comm::ConnAcceptor(cbd->conn, FdNote(fdNote), sub));
+    } else if (sock_type == SOCK_DGRAM) {
+        // UDP: setup the listener socket, but do not set a subscriber
+        Comm::ConnectionPointer udpConn = listenConn;
+        comm_open_listener(sock_type, proto, udpConn, FdNote(fdNote));
+    } else {
+        fatalf("Invalid Socket Type (%d)",sock_type);
+    }
+    cbd->errNo = cbd->conn->isOpen() ? 0 : errno;
     leave_suid();
 
-    debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
-    StartListeningCb *cbd =
-        dynamic_cast<StartListeningCb*>(callback->getDialer());
-    Must(cbd);
-    cbd->fd = sock;
-    cbd->errNo = errNo;
+    debugs(54, 3, HERE << "opened listen " << cbd->conn);
     ScheduleCallHere(callback);
 }

=== modified file 'src/ipc/StartListening.h'
--- src/ipc/StartListening.h	2010-07-25 08:10:12 +0000
+++ src/ipc/StartListening.h	2010-09-24 09:06:49 +0000
@@ -9,9 +9,11 @@
 #define SQUID_IPC_START_LISTENING_H
 
 #include "config.h"
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "comm/forward.h"
 #include "ip/forward.h"
 #include "ipc/FdNotes.h"
-#include "base/AsyncCall.h"
 
 #if HAVE_IOSFWD
 #include <iosfwd>
@@ -31,14 +33,15 @@
     std::ostream &startPrint(std::ostream &os) const;
 
 public:
-    int fd; ///< opened listening socket or -1
+    Comm::ConnectionPointer conn; ///< opened listening socket
     int errNo; ///< errno value from the comm_open_listener() call
+    Subscription::Pointer handlerSubscription; ///< The subscription we will pass on to the ConnAcceptor
 };
 
 /// Depending on whether SMP is on, either ask Coordinator to send us
-/// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
-                           int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+/// the listening FD or start a connection acceptor directly.
+extern void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+                           FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &handlerSub);
 
 } // namespace Ipc;
 

=== modified file 'src/ipc/Strand.cc'
--- src/ipc/Strand.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/Strand.cc	2010-09-24 09:29:47 +0000
@@ -6,13 +6,14 @@
  */
 
 #include "config.h"
+#include "base/Subscription.h"
 #include "base/TextException.h"
+#include "comm/Connection.h"
 #include "ipc/Strand.h"
 #include "ipc/Messages.h"
 #include "ipc/SharedListen.h"
 #include "ipc/Kids.h"
 
-
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
 
 

=== modified file 'src/ipc/UdsOp.cc'
--- src/ipc/UdsOp.cc	2010-08-24 00:12:54 +0000
+++ src/ipc/UdsOp.cc	2010-09-24 07:10:17 +0000
@@ -4,20 +4,18 @@
  * DEBUG: section 54    Interprocess Communication
  *
  */
-
-
 #include "config.h"
+#include "base/TextException.h"
 #include "comm.h"
 #include "CommCalls.h"
-#include "base/TextException.h"
+#include "comm/Connection.h"
 #include "ipc/UdsOp.h"
 
 
 Ipc::UdsOp::UdsOp(const String& pathAddr):
         AsyncJob("Ipc::UdsOp"),
         address(PathToAddress(pathAddr)),
-        options(COMM_NONBLOCKING),
-        fd_(-1)
+        options(COMM_NONBLOCKING)
 {
     debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
 }
@@ -25,8 +23,9 @@
 Ipc::UdsOp::~UdsOp()
 {
     debugs(54, 5, HERE << '[' << this << ']');
-    if (fd_ >= 0)
-        comm_close(fd_);
+    if (Comm::IsConnOpen(conn_))
+        conn_->close();
+    conn_ = NULL;
 }
 
 void Ipc::UdsOp::setOptions(int newOptions)
@@ -34,15 +33,18 @@
     options = newOptions;
 }
 
-int Ipc::UdsOp::fd()
+Comm::ConnectionPointer &
+Ipc::UdsOp::conn()
 {
-    if (fd_ < 0) {
+    if (!Comm::IsConnOpen(conn_)) {
         if (options & COMM_DOBIND)
             unlink(address.sun_path);
-        fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
-        Must(fd_ >= 0);
+        if (conn_ == NULL)
+            conn_ = new Comm::Connection;
+        conn_->fd = comm_open_uds(SOCK_DGRAM, 0, &address, options);
+        Must(Comm::IsConnOpen(conn_));
     }
-    return fd_;
+    return conn_;
 }
 
 void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
@@ -50,12 +52,12 @@
     typedef CommCbMemFunT<UdsOp, CommTimeoutCbParams> Dialer;
     AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
                                            Dialer(CbcPointer<UdsOp>(this), &UdsOp::noteTimeout));
-    commSetTimeout(fd(), seconds, handler);
+    commSetTimeout(conn()->fd, seconds, handler);
 }
 
 void Ipc::UdsOp::clearTimeout()
 {
-    commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+    commSetTimeout(conn()->fd, -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
 }
 
 void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
@@ -106,17 +108,17 @@
     typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
     AsyncCall::Pointer writeHandler = JobCallback(54, 5,
                                       Dialer, this, UdsSender::wrote);
-    comm_write(fd(), message.raw(), message.size(), writeHandler);
+    comm_write(conn(), message.raw(), message.size(), writeHandler);
     writing = true;
 }
 
 void Ipc::UdsSender::wrote(const CommIoCbParams& params)
 {
-    debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+    debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']');
     writing = false;
     if (params.flag != COMM_OK && retries-- > 0) {
         sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
-        write(); // XXX: should we close on error so that fd() reopens?
+        write(); // XXX: should we close on error so that conn() reopens?
     }
 }
 

=== modified file 'src/ipc/UdsOp.h'
--- src/ipc/UdsOp.h	2010-07-06 18:58:38 +0000
+++ src/ipc/UdsOp.h	2010-09-20 10:07:01 +0000
@@ -11,6 +11,7 @@
 
 #include "SquidString.h"
 #include "base/AsyncJob.h"
+#include "comm/forward.h"
 #include "ipc/TypedMsgHdr.h"
 
 class CommTimeoutCbParams;
@@ -33,7 +34,7 @@
 protected:
     virtual void timedout() {} ///< called after setTimeout() if timed out
 
-    int fd(); ///< creates if needed and returns raw UDS socket descriptor
+    Comm::ConnectionPointer &conn(); ///< creates if needed and returns raw UDS socket descriptor
 
     /// call timedout() if no UDS messages in a given number of seconds
     void setTimeout(int seconds, const char *handlerName);
@@ -47,7 +48,7 @@
 
 private:
     int options; ///< UDS options
-    int fd_; ///< UDS descriptor
+    Comm::ConnectionPointer conn_; ///< UDS descriptor
 
 private:
     UdsOp(const UdsOp &); // not implemented

Reply via email to