Two patches:
patch #1:
This builds on the changes already audited in src/comm and src/base
to implement TCP listening ports being opened with ConnAcceptor and a
Subscription (to emit the HTTP or HTTPS client connect calls).
Alex; can you check that I have understood the IPC systems correctly?
NP: There is now a possibility to implementing the UDP receivers (SNMP,
DNS, ICP, multicast ICP, HTCP) in the event subscription model as well
instead of the reader model. I have not done that though.
Amos
--
Please be using
Current Stable Squid 2.7.STABLE9 or 3.1.8
Beta testers wanted for 3.2.0.2
=== modified file 'src/ipc/Coordinator.cc'
--- src/ipc/Coordinator.cc 2010-07-06 23:09:44 +0000
+++ src/ipc/Coordinator.cc 2010-09-24 09:29:17 +0000
@@ -7,7 +7,9 @@
#include "config.h"
+#include "base/Subscription.h"
#include "comm.h"
+#include "comm/Connection.h"
#include "ipc/Coordinator.h"
#include "ipc/FdNotes.h"
#include "ipc/SharedListen.h"
@@ -81,20 +83,20 @@
" needs shared listen FD for " << request.params.addr);
Listeners::const_iterator i = listeners.find(request.params);
int errNo = 0;
- const int sock = (i != listeners.end()) ?
+ const Comm::ConnectionPointer c = (i != listeners.end()) ?
i->second : openListenSocket(request, errNo);
- debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+ debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
request.params.addr << " to kid" << request.requestorId <<
" mapId=" << request.mapId);
- SharedListenResponse response(sock, errNo, request.mapId);
+ SharedListenResponse response(c, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
}
-int
+Comm::ConnectionPointer
Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
int &errNo)
{
@@ -103,19 +105,23 @@
debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
request.requestorId);
- Ip::Address addr = p.addr; // comm_open_listener may modify it
+ Comm::ConnectionPointer conn = new Comm::Connection;
+ conn->local = p.addr; // comm_open_listener may modify it
+ conn->flags = p.flags;
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
- FdNote(p.fdNote));
- errNo = (sock >= 0) ? 0 : errno;
+ comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+ errNo = Comm::IsConnOpen(conn) ? 0 : errno;
leave_suid();
+ debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+ request.requestorId);
+
// cache positive results
- if (sock >= 0)
- listeners[request.params] = sock;
+ if (Comm::IsConnOpen(conn))
+ listeners[request.params] = conn;
- return sock;
+ return conn;
}
void Ipc::Coordinator::broadcastSignal(int sig) const
=== modified file 'src/ipc/Coordinator.h'
--- src/ipc/Coordinator.h 2010-07-05 21:28:23 +0000
+++ src/ipc/Coordinator.h 2010-09-24 07:09:11 +0000
@@ -41,13 +41,13 @@
void handleSharedListenRequest(const SharedListenRequest& request);
/// calls comm_open_listener()
- int openListenSocket(const SharedListenRequest& request, int &errNo);
+ Comm::ConnectionPointer openListenSocket(const SharedListenRequest& request, int &errNo);
private:
typedef Vector<StrandCoord> Strands; ///< unsorted strands
Strands strands; ///< registered processes and threads
- typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
+ typedef std::map<OpenListenerParams, Comm::ConnectionPointer> Listeners; ///< params:fd map
Listeners listeners; ///< cached comm_open_listener() results
static Coordinator* TheInstance; ///< the only class instance in existence
=== modified file 'src/ipc/Port.cc'
--- src/ipc/Port.cc 2010-08-24 00:12:54 +0000
+++ src/ipc/Port.cc 2010-10-02 10:16:26 +0000
@@ -5,9 +5,10 @@
*
*/
-
#include "config.h"
+#include "comm.h"
#include "CommCalls.h"
+#include "comm/Connection.h"
#include "ipc/Port.h"
const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
@@ -33,7 +34,7 @@
typedef CommCbMemFunT<Port, CommIoCbParams> Dialer;
AsyncCall::Pointer readHandler = JobCallback(54, 6,
Dialer, this, Port::noteRead);
- comm_read(fd(), buf.raw(), buf.size(), readHandler);
+ comm_read(conn(), buf.raw(), buf.size(), readHandler);
}
bool Ipc::Port::doneAll() const
@@ -53,7 +54,7 @@
void Ipc::Port::noteRead(const CommIoCbParams& params)
{
- debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag <<
+ debugs(54, 6, HERE << params.conn << " flag " << params.flag <<
" [" << this << ']');
if (params.flag == COMM_OK) {
assert(params.buf == buf.raw());
=== modified file 'src/ipc/SharedListen.cc'
--- src/ipc/SharedListen.cc 2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.cc 2010-09-24 09:28:25 +0000
@@ -6,9 +6,9 @@
*/
#include "config.h"
-#include <map>
+#include "base/TextException.h"
#include "comm.h"
-#include "base/TextException.h"
+#include "comm/Connection.h"
#include "ipc/Port.h"
#include "ipc/Messages.h"
#include "ipc/Kids.h"
@@ -16,6 +16,7 @@
#include "ipc/StartListening.h"
#include "ipc/SharedListen.h"
+#include <map>
/// holds information necessary to handle JoinListen response
class PendingOpenRequest
@@ -80,22 +81,24 @@
}
-Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
- fd(aFd), errNo(anErrNo), mapId(aMapId)
+Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer &c, int anErrNo, int aMapId):
+ conn(c), errNo(anErrNo), mapId(aMapId)
{
}
Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
- fd(-1), errNo(0), mapId(-1)
+ conn(NULL), errNo(0), mapId(-1)
{
hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
- fd = hdrMsg.getFd();
+ conn = new Comm::Connection;
+ conn->fd = hdrMsg.getFd();
+ // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
}
void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
- hdrMsg.putFd(fd);
+ hdrMsg.putFd(conn->fd);
}
@@ -121,9 +124,10 @@
void Ipc::SharedListenJoined(const SharedListenResponse &response)
{
- const int fd = response.fd;
+ Comm::ConnectionPointer c = response.conn;
- debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+ // Dont debugs c fully since only FD is filled right now.
+ debugs(54, 3, HERE << "got listening FD " << c->fd << " errNo=" <<
response.errNo << " mapId=" << response.mapId);
Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
@@ -131,20 +135,23 @@
Must(por.callback != NULL);
TheSharedListenRequestMap.erase(response.mapId);
- if (fd >= 0) {
+ if (Comm::IsConnOpen(c)) {
OpenListenerParams &p = por.params;
+ c->local = p.addr;
+ c->flags = p.flags;
+ // XXX: leave the comm AI stuff to comm_import_opened()?
struct addrinfo *AI = NULL;
p.addr.GetAddrInfo(AI);
AI->ai_socktype = p.sock_type;
AI->ai_protocol = p.proto;
- comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+ comm_import_opened(c, FdNote(p.fdNote), AI);
p.addr.FreeAddrInfo(AI);
}
- StartListeningCb *cbd =
- dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+ StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
Must(cbd);
- cbd->fd = fd;
+ cbd->conn = c;
cbd->errNo = response.errNo;
+ cbd->handlerSubscription = por.params.handlerSubscription;
ScheduleCallHere(por.callback);
}
=== modified file 'src/ipc/SharedListen.h'
--- src/ipc/SharedListen.h 2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.h 2010-09-24 09:23:04 +0000
@@ -15,7 +15,8 @@
/// "shared listen" is when concurrent processes are listening on the same fd
-/// comm_open_listener() parameters holder
+/// Comm::ConnAcceptor parameters holder
+/// all the details necessary to recreate a Comm::Connection and fde entry for the kid listener FD
class OpenListenerParams
{
public:
@@ -23,11 +24,17 @@
bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
+ // bits to re-create the fde entry
int sock_type;
int proto;
+ int fdNote; ///< index into fd_note() comment strings
+
+ // bits to re-create the listener Comm::Connection descriptor
Ip::Address addr; ///< will be memset and memcopied
int flags;
- int fdNote; ///< index into fd_note() comment strings
+
+ /// handler to subscribe to Comm::ConnAcceptor when we get the response
+ Subscription::Pointer handlerSubscription;
};
class TypedMsgHdr;
@@ -52,12 +59,12 @@
class SharedListenResponse
{
public:
- SharedListenResponse(int fd, int errNo, int mapId);
+ SharedListenResponse(const Comm::ConnectionPointer &c, int errNo, int mapId);
explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
- int fd; ///< opened listening socket or -1
+ Comm::ConnectionPointer conn; ///< opened listening socket or -1
int errNo; ///< errno value from comm_open_sharedListen() call
int mapId; ///< to map future response to the requestor's callback
};
=== modified file 'src/ipc/StartListening.cc'
--- src/ipc/StartListening.cc 2010-07-06 23:09:44 +0000
+++ src/ipc/StartListening.cc 2010-10-02 02:23:20 +0000
@@ -6,13 +6,16 @@
*/
#include "config.h"
+#include "base/Subscription.h"
+#include "base/TextException.h"
#include "comm.h"
-#include "base/TextException.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
#include "ipc/SharedListen.h"
#include "ipc/StartListening.h"
-Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+Ipc::StartListeningCb::StartListeningCb(): conn(NULL), errNo(0)
{
}
@@ -22,37 +25,44 @@
std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
{
- return os << "(FD " << fd << ", err=" << errNo;
+ return os << "(" << conn << ", err=" << errNo;
}
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
{
- OpenListenerParams p;
- p.sock_type = sock_type;
- p.proto = proto;
- p.addr = addr;
- p.flags = flags;
- p.fdNote = fdNote;
-
if (UsingSmp()) { // if SMP is on, share
+ OpenListenerParams p;
+ p.sock_type = sock_type;
+ p.proto = proto;
+ p.addr = listenConn->local;
+ p.flags = listenConn->flags;
+ p.fdNote = fdNote;
+ p.handlerSubscription = sub;
+
Ipc::JoinSharedListen(p, callback);
return; // wait for the call back
}
+ StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+ Must(cbd);
+ cbd->conn = listenConn;
+
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
- FdNote(p.fdNote));
- const int errNo = (sock >= 0) ? 0 : errno;
+ if (sock_type == SOCK_STREAM) {
+ // TCP: setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ AsyncJob::Start(new Comm::ConnAcceptor(cbd->conn, FdNote(fdNote), sub));
+ } else if (sock_type == SOCK_DGRAM) {
+ // UDP: setup the listener socket, but do not set a subscriber
+ Comm::ConnectionPointer udpConn = listenConn;
+ comm_open_listener(sock_type, proto, udpConn, FdNote(fdNote));
+ } else {
+ fatalf("Invalid Socket Type (%d)",sock_type);
+ }
+ cbd->errNo = cbd->conn->isOpen() ? 0 : errno;
leave_suid();
- debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
- StartListeningCb *cbd =
- dynamic_cast<StartListeningCb*>(callback->getDialer());
- Must(cbd);
- cbd->fd = sock;
- cbd->errNo = errNo;
+ debugs(54, 3, HERE << "opened listen " << cbd->conn);
ScheduleCallHere(callback);
}
=== modified file 'src/ipc/StartListening.h'
--- src/ipc/StartListening.h 2010-07-25 08:10:12 +0000
+++ src/ipc/StartListening.h 2010-09-24 09:06:49 +0000
@@ -9,9 +9,11 @@
#define SQUID_IPC_START_LISTENING_H
#include "config.h"
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "comm/forward.h"
#include "ip/forward.h"
#include "ipc/FdNotes.h"
-#include "base/AsyncCall.h"
#if HAVE_IOSFWD
#include <iosfwd>
@@ -31,14 +33,15 @@
std::ostream &startPrint(std::ostream &os) const;
public:
- int fd; ///< opened listening socket or -1
+ Comm::ConnectionPointer conn; ///< opened listening socket
int errNo; ///< errno value from the comm_open_listener() call
+ Subscription::Pointer handlerSubscription; ///< The subscription we will pass on to the ConnAcceptor
};
/// Depending on whether SMP is on, either ask Coordinator to send us
-/// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+/// the listening FD or start a connection acceptor directly.
+extern void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &handlerSub);
} // namespace Ipc;
=== modified file 'src/ipc/Strand.cc'
--- src/ipc/Strand.cc 2010-07-06 23:09:44 +0000
+++ src/ipc/Strand.cc 2010-09-24 09:29:47 +0000
@@ -6,13 +6,14 @@
*/
#include "config.h"
+#include "base/Subscription.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "ipc/Strand.h"
#include "ipc/Messages.h"
#include "ipc/SharedListen.h"
#include "ipc/Kids.h"
-
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
=== modified file 'src/ipc/UdsOp.cc'
--- src/ipc/UdsOp.cc 2010-08-24 00:12:54 +0000
+++ src/ipc/UdsOp.cc 2010-09-24 07:10:17 +0000
@@ -4,20 +4,18 @@
* DEBUG: section 54 Interprocess Communication
*
*/
-
-
#include "config.h"
+#include "base/TextException.h"
#include "comm.h"
#include "CommCalls.h"
-#include "base/TextException.h"
+#include "comm/Connection.h"
#include "ipc/UdsOp.h"
Ipc::UdsOp::UdsOp(const String& pathAddr):
AsyncJob("Ipc::UdsOp"),
address(PathToAddress(pathAddr)),
- options(COMM_NONBLOCKING),
- fd_(-1)
+ options(COMM_NONBLOCKING)
{
debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
}
@@ -25,8 +23,9 @@
Ipc::UdsOp::~UdsOp()
{
debugs(54, 5, HERE << '[' << this << ']');
- if (fd_ >= 0)
- comm_close(fd_);
+ if (Comm::IsConnOpen(conn_))
+ conn_->close();
+ conn_ = NULL;
}
void Ipc::UdsOp::setOptions(int newOptions)
@@ -34,15 +33,18 @@
options = newOptions;
}
-int Ipc::UdsOp::fd()
+Comm::ConnectionPointer &
+Ipc::UdsOp::conn()
{
- if (fd_ < 0) {
+ if (!Comm::IsConnOpen(conn_)) {
if (options & COMM_DOBIND)
unlink(address.sun_path);
- fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
- Must(fd_ >= 0);
+ if (conn_ == NULL)
+ conn_ = new Comm::Connection;
+ conn_->fd = comm_open_uds(SOCK_DGRAM, 0, &address, options);
+ Must(Comm::IsConnOpen(conn_));
}
- return fd_;
+ return conn_;
}
void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
@@ -50,12 +52,12 @@
typedef CommCbMemFunT<UdsOp, CommTimeoutCbParams> Dialer;
AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
Dialer(CbcPointer<UdsOp>(this), &UdsOp::noteTimeout));
- commSetTimeout(fd(), seconds, handler);
+ commSetTimeout(conn()->fd, seconds, handler);
}
void Ipc::UdsOp::clearTimeout()
{
- commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+ commSetTimeout(conn()->fd, -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
}
void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
@@ -106,17 +108,17 @@
typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
AsyncCall::Pointer writeHandler = JobCallback(54, 5,
Dialer, this, UdsSender::wrote);
- comm_write(fd(), message.raw(), message.size(), writeHandler);
+ comm_write(conn(), message.raw(), message.size(), writeHandler);
writing = true;
}
void Ipc::UdsSender::wrote(const CommIoCbParams& params)
{
- debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']');
writing = false;
if (params.flag != COMM_OK && retries-- > 0) {
sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
- write(); // XXX: should we close on error so that fd() reopens?
+ write(); // XXX: should we close on error so that conn() reopens?
}
}
=== modified file 'src/ipc/UdsOp.h'
--- src/ipc/UdsOp.h 2010-07-06 18:58:38 +0000
+++ src/ipc/UdsOp.h 2010-09-20 10:07:01 +0000
@@ -11,6 +11,7 @@
#include "SquidString.h"
#include "base/AsyncJob.h"
+#include "comm/forward.h"
#include "ipc/TypedMsgHdr.h"
class CommTimeoutCbParams;
@@ -33,7 +34,7 @@
protected:
virtual void timedout() {} ///< called after setTimeout() if timed out
- int fd(); ///< creates if needed and returns raw UDS socket descriptor
+ Comm::ConnectionPointer &conn(); ///< creates if needed and returns raw UDS socket descriptor
/// call timedout() if no UDS messages in a given number of seconds
void setTimeout(int seconds, const char *handlerName);
@@ -47,7 +48,7 @@
private:
int options; ///< UDS options
- int fd_; ///< UDS descriptor
+ Comm::ConnectionPointer conn_; ///< UDS descriptor
private:
UdsOp(const UdsOp &); // not implemented