On 19/09/10 23:35, Amos Jeffries wrote:
Round 2 of only src/comm/ changes.
I believe this accounts for all the bits requested to date. Including
adding Subscriptions for ConnAcceptor.
Amos
... and here is the patch. :(
Amos
=== modified file 'src/comm/AcceptLimiter.cc'
--- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000
+++ src/comm/AcceptLimiter.cc 2010-09-18 02:09:49 +0000
@@ -1,6 +1,7 @@
#include "config.h"
#include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
#include "fde.h"
Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,22 +12,41 @@
}
void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::ConnAcceptor *afd)
{
afd->isLimited++;
- debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+ debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited);
deferred.push_back(afd);
}
void
+Comm::AcceptLimiter::removeDead(const Comm::ConnAcceptor *afd)
+{
+ for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+ if (deferred[i] == afd) {
+ deferred[i]->isLimited--;
+ deferred[i] = NULL; // fast. kick() will skip empty entries later.
+ debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited);
+ }
+ }
+}
+
+void
Comm::AcceptLimiter::kick()
{
+ // TODO: this could be optimized further with an iterator to search
+ // looking for first non-NULL, followed by dumping the first N
+ // with only one shift()/pop_ftron operation
+
debugs(5, 5, HERE << " size=" << deferred.size());
- if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
- debugs(5, 5, HERE << " doing one.");
+ while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
/* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
- ListenStateData *temp = deferred.shift();
- temp->isLimited--;
- temp->acceptNext();
+ ConnAcceptor *temp = deferred.shift();
+ if (temp != NULL) {
+ debugs(5, 5, HERE << " doing one.");
+ temp->isLimited--;
+ temp->acceptNext();
+ break;
+ }
}
}
=== modified file 'src/comm/AcceptLimiter.h'
--- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000
+++ src/comm/AcceptLimiter.h 2010-09-18 02:11:31 +0000
@@ -1,12 +1,12 @@
-#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
-#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+#ifndef _SQUID_SRC_COMM_ACCEPTLIMITER_H
+#define _SQUID_SRC_COMM_ACCEPTLIMITER_H
#include "Array.h"
namespace Comm
{
-class ListenStateData;
+class ConnAcceptor;
/**
* FIFO Queue holding listener socket handlers which have been activated
@@ -14,7 +14,8 @@
* But when doing so there were not enough FD available to handle the
* new connection. These handlers are awaiting some FD to become free.
*
- * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
+ * defer - used only by Comm layer ConnAcceptor adding themselves when FD are limited.
+ * removeDead - used only by Comm layer ConnAcceptor to remove themselves when dying.
* kick - used by Comm layer when FD are closed.
*/
class AcceptLimiter
@@ -25,7 +26,10 @@
static AcceptLimiter &Instance();
/** delay accepting a new client connection. */
- void defer(Comm::ListenStateData *afd);
+ void defer(Comm::ConnAcceptor *afd);
+
+ /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+ void removeDead(const Comm::ConnAcceptor *afd);
/** try to accept and begin processing any delayed client connections. */
void kick();
@@ -34,9 +38,9 @@
static AcceptLimiter Instance_;
/** FIFO queue */
- Vector<Comm::ListenStateData*> deferred;
+ Vector<Comm::ConnAcceptor*> deferred;
};
}; // namepace Comm
-#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */
+#endif /* _SQUID_SRC_COMM_ACCEPTLIMITER_H */
=== renamed file 'src/comm/ListenStateData.cc' => 'src/comm/ConnAcceptor.cc'
--- src/comm/ListenStateData.cc 2010-08-10 03:11:19 +0000
+++ src/comm/ConnAcceptor.cc 2010-09-18 01:56:02 +0000
@@ -33,15 +33,100 @@
*/
#include "squid.h"
+#include "base/TextException.h"
#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
-#include "ConnectionDetail.h"
+#include "comm/Connection.h"
+#include "comm/ConnAcceptor.h"
#include "fde.h"
#include "protos.h"
#include "SquidTime.h"
+namespace Comm {
+ CBDATA_CLASS_INIT(ConnAcceptor);
+};
+
+Comm::ConnAcceptor::ConnAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) :
+ AsyncJob("Comm::ConnAcceptor"),
+ errcode(0),
+ isLimited(0),
+ theCallSub(aSub),
+ conn(newConn)
+{
+ assert(newConn != NULL);
+
+ /* open the conn if its not already open */
+ if (!IsConnOpen(conn)) {
+ conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
+ errcode = errno;
+
+ if (!conn->isOpen()) {
+ debugs(5, DBG_CRITICAL, HERE << "comm_open failed: " << conn << " error: " << errcode);
+ conn = NULL;
+ return;
+ }
+ debugs(9, 3, HERE << "Unconnected data socket created on " << conn);
+ }
+ assert(IsConnOpen(newConn));
+}
+
+void
+Comm::ConnAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+ debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << aSub);
+ unsubscribe("subscription change");
+ theCallSub = aSub;
+}
+
+void
+Comm::ConnAcceptor::unsubscribe(const char *reason)
+{
+ debugs(5, 5, HERE << conn << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+ theCallSub = NULL;
+}
+
+void
+Comm::ConnAcceptor::start()
+{
+ debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << theCallSub);
+
+ Must(IsConnOpen(conn));
+
+ setListen();
+
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::ConnAcceptor::doneAll() const
+{
+ // stop when FD is closed
+ if (!IsConnOpen(conn)) {
+ return AsyncJob::doneAll();
+ }
+
+ // stop when handlers are gone
+ if (theCallSub == NULL) {
+ return AsyncJob::doneAll();
+ }
+
+ // open FD with handlers...keep accepting.
+ return false;
+}
+
+void
+Comm::ConnAcceptor::swanSong()
+{
+ debugs(5,5, HERE);
+ unsubscribe("swanSong");
+ conn = NULL;
+ AcceptLimiter::Instance().removeDead(this);
+ AsyncJob::swanSong();
+}
+
/**
* New-style listen and accept routines
*
@@ -50,11 +135,11 @@
* accept()ed some time later.
*/
void
-Comm::ListenStateData::setListen()
+Comm::ConnAcceptor::setListen()
{
errcode = 0; // reset local errno copy.
- if (listen(fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ if (listen(conn->fd, Squid_MaxFD >> 2) < 0) {
+ debugs(50, DBG_CRITICAL, "ERROR: listen(" << conn << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
errcode = errno;
return;
}
@@ -63,40 +148,22 @@
#ifdef SO_ACCEPTFILTER
struct accept_filter_arg afa;
bzero(&afa, sizeof(afa));
- debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn);
xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
- debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+ if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+ debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
#elif defined(TCP_DEFER_ACCEPT)
int seconds = 30;
if (strncmp(Config.accept_filter, "data=", 5) == 0)
seconds = atoi(Config.accept_filter + 5);
- if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
- debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+ if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+ debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
#else
- debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+ debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
#endif
}
}
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
- fd(aFd),
- theCallback(call),
- mayAcceptMore(accept_many)
-{
- assert(aFd >= 0);
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
- assert(isOpen(aFd));
- setListen();
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
- comm_close(fd);
- fd = -1;
-}
-
/**
* This private callback is called whenever a filedescriptor is ready
* to dupe itself and fob off an accept()ed connection
@@ -107,23 +174,30 @@
* done later when enough sockets become available.
*/
void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::ConnAcceptor::doAccept(int fd, void *data)
{
- debugs(5, 2, HERE << "New connection on FD " << fd);
-
- assert(isOpen(fd));
- ListenStateData *afd = static_cast<ListenStateData*>(data);
-
- if (!okToAccept()) {
- AcceptLimiter::Instance().defer(afd);
- } else {
- afd->acceptNext();
+ try {
+ debugs(5, 2, HERE << "New connection on FD " << fd);
+
+ Must(isOpen(fd));
+ ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
+
+ if (!okToAccept()) {
+ AcceptLimiter::Instance().defer(afd);
+ } else {
+ afd->acceptNext();
+ }
+ commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
+
+ } catch(TextException &e) {
+ fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+ } catch(...) {
+ fatal("FATAL: error while accepting new client connection: [unkown]\n");
}
- commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
}
bool
-Comm::ListenStateData::okToAccept()
+Comm::ConnAcceptor::okToAccept()
{
static time_t last_warn = 0;
@@ -139,7 +213,7 @@
}
void
-Comm::ListenStateData::acceptOne()
+Comm::ConnAcceptor::acceptOne()
{
/*
* We don't worry about running low on FDs here. Instead,
@@ -148,42 +222,43 @@
*/
/* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = oldAccept(connDetails);
+ ConnectionPointer newConnDetails = new Connection();
+ comm_err_t status = oldAccept(newConnDetails);
/* Check for errors */
- if (newfd < 0) {
+ if (!newConnDetails->isOpen()) {
- if (newfd == COMM_NOMESSAGE) {
+ if (status == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+ debugs(5, 5, HERE << "try later: " << conn << " handler Subscription: " << theCallSub);
+ commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
- notify(-1, COMM_ERROR, connDetails);
- mayAcceptMore = false;
+ debugs(5, 5, HERE << "non-recoverable error: " << conn << " handler Subscription: " << theCallSub);
+ notify(status, newConnDetails);
+ mustStop("Listener socket closed");
return;
}
- debugs(5, 5, HERE << "accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << theCallback);
- notify(newfd, COMM_OK, connDetails);
+ debugs(5, 5, HERE << "Listener: " << conn <<
+ " accepted new connection " << newConnDetails <<
+ " handler Subscription: " << theCallSub);
+ notify(status, newConnDetails);
}
void
-Comm::ListenStateData::acceptNext()
+Comm::ConnAcceptor::acceptNext()
{
- assert(isOpen(fd));
- debugs(5, 2, HERE << "connection on FD " << fd);
+ Must(IsConnOpen(conn));
+ debugs(5, 2, HERE << "connection on " << conn);
acceptOne();
}
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::ConnAcceptor::notify(comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
{
// listener socket handlers just abandon the port with COMM_ERR_CLOSING
// it should only happen when this object is deleted...
@@ -191,89 +266,95 @@
return;
}
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = fd;
- params.nfd = newfd;
- params.details = connDetails;
+ if (theCallSub != NULL) {
+ AsyncCall::Pointer call = theCallSub->callback();
+ CommAcceptCbParams ¶ms = GetCommParams<CommAcceptCbParams>(call);
+ params.fd = conn->fd;
+ params.conn = newConnDetails;
params.flag = flag;
params.xerrno = errcode;
- ScheduleCallHere(theCallback);
- if (!mayAcceptMore)
- theCallback = NULL;
+ ScheduleCallHere(call);
}
}
/**
* accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK success. details parameter filled.
+ * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR an outright failure occured.
+ * Or if this client has too many connections already.
*/
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::ConnAcceptor::oldAccept(Comm::ConnectionPointer &details)
{
PROF_start(comm_accept);
statCounter.syscalls.sock.accepts++;
int sock;
struct addrinfo *gai = NULL;
- details.me.InitAddrInfo(gai);
+ details->local.InitAddrInfo(gai);
errcode = 0; // reset local errno copy.
- if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+ if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
errcode = errno; // store last accept errno locally.
- details.me.FreeAddrInfo(gai);
+ details->local.FreeAddrInfo(gai);
PROF_stop(comm_accept);
if (ignoreErrno(errno)) {
- debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 5, HERE << conn << ": " << xstrerror());
return COMM_NOMESSAGE;
} else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 3, HERE << conn << ": " << xstrerror());
return COMM_ERROR;
} else {
- debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 1, HERE << conn << ": " << xstrerror());
return COMM_ERROR;
}
}
- details.peer = *gai;
+ Must(sock >= 0);
+ details->fd = sock;
+ details->remote = *gai;
if ( Config.client_ip_max_connections >= 0) {
- if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) {
- debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections.");
- details.me.FreeAddrInfo(gai);
+ if (clientdbEstablished(details->remote, 0) > Config.client_ip_max_connections) {
+ debugs(50, DBG_IMPORTANT, "WARNING: " << details->remote << " attempting more than " << Config.client_ip_max_connections << " connections.");
+ details->local.FreeAddrInfo(gai);
return COMM_ERROR;
}
}
- details.me.InitAddrInfo(gai);
-
- details.me.SetEmpty();
+ // lookup the local-end details of this new connection
+ details->local.InitAddrInfo(gai);
+ details->local.SetEmpty();
getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
- details.me = *gai;
-
- commSetCloseOnExec(sock);
+ details->local = *gai;
+ details->local.FreeAddrInfo(gai);
/* fdstat update */
+ // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+ // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
fd_open(sock, FD_SOCKET, "HTTP Request");
fdd_table[sock].close_file = NULL;
fdd_table[sock].close_line = 0;
fde *F = &fd_table[sock];
- details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
- F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
- F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
- details.me.FreeAddrInfo(gai);
+ details->remote.NtoA(F->ipaddr,MAX_IPSTRLEN);
+ F->remote_port = details->remote.GetPort();
+ F->local_addr = details->local;
+ F->sock_family = details->local.IsIPv6()?AF_INET6:AF_INET;
+ // set socket flags
+ commSetCloseOnExec(sock);
commSetNonBlocking(sock);
/* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
- F->flags.transparent = fd_table[fd].flags.transparent;
+ F->flags.transparent = fd_table[conn->fd].flags.transparent;
PROF_stop(comm_accept);
- return sock;
+ return COMM_OK;
}
=== renamed file 'src/comm/ListenStateData.h' => 'src/comm/ConnAcceptor.h'
--- src/comm/ListenStateData.h 2010-08-09 10:48:17 +0000
+++ src/comm/ConnAcceptor.h 2010-09-10 11:31:04 +0000
@@ -1,40 +1,77 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_CONNACCEPTOR_H
+#define SQUID_COMM_CONNACCEPTOR_H
#include "config.h"
-#include "base/AsyncCall.h"
-#include "comm.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm/comm_err_t.h"
+#include "comm/forward.h"
+
#if HAVE_MAP
#include <map>
#endif
-class ConnectionDetail;
-
namespace Comm
{
-class ListenStateData
+class AcceptLimiter;
+
+/**
+ * Listens on a Comm::Connection for new incoming connections and
+ * emits an active Comm::Connection descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class ConnAcceptor : public AsyncJob
{
+private:
+ virtual void start();
+ virtual bool doneAll() const;
+ virtual void swanSong();
public:
- ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
- ListenStateData(const ListenStateData &r); // not implemented.
- ~ListenStateData();
-
- void subscribe(AsyncCall::Pointer &call);
+ ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub);
+ ConnAcceptor(const ConnAcceptor &r); // not implemented.
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Replaces any existing subscribed handler.
+ */
+ void subscribe(const Subscription::Pointer &aSub);
+
+ /** Remove the currently waiting callback subscription.
+ * Pending calls will remain scheduled.
+ */
+ void unsubscribe(const char *reason);
+
+ /** Try and accept another connection (synchronous).
+ * If one is pending already the subscribed callback handler will be scheduled
+ * to handle it before this method returns.
+ */
void acceptNext();
- void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
- int fd;
+ /// Call the subscribed callback handler with details about a new connection.
+ void notify(comm_err_t flag, const Comm::ConnectionPointer &details);
/// errno code of the last accept() or listen() action if one occurred.
int errcode;
- /// whether this socket is delayed and on the AcceptLimiter queue.
- int32_t isLimited;
-
-private:
- /// Method to test if there are enough file escriptors to open a new client connection
+private:
+ friend class AcceptLimiter;
+ int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue.
+ Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events.
+
+ /// conn being listened on for new connections
+ /// Reserved for read-only use.
+ ConnectionPointer conn;
+
+private:
+ /// Method to test if there are enough file descriptors to open a new client connection
/// if not the accept() will be postponed
static bool okToAccept();
@@ -42,14 +79,12 @@
static void doAccept(int fd, void *data);
void acceptOne();
- int oldAccept(ConnectionDetail &details);
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-
+ comm_err_t oldAccept(Comm::ConnectionPointer &details);
void setListen();
+
+ CBDATA_CLASS2(ConnAcceptor);
};
}; // namespace Comm
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_CONNACCEPTOR_H */
=== added file 'src/comm/ConnOpener.cc'
--- src/comm/ConnOpener.cc 1970-01-01 00:00:00 +0000
+++ src/comm/ConnOpener.cc 2010-09-05 04:12:56 +0000
@@ -0,0 +1,290 @@
+/*
+ * DEBUG: section 05 Socket Connection Opener
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "comm/ConnOpener.h"
+#include "comm/Connection.h"
+#include "comm.h"
+#include "fde.h"
+#include "icmp/net_db.h"
+#include "SquidTime.h"
+
+namespace Comm {
+ CBDATA_CLASS_INIT(ConnOpener);
+};
+
+Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) :
+ AsyncJob("Comm::ConnOpener"),
+ host_(NULL),
+ conn_(c),
+ callback_(handler),
+ totalTries_(0),
+ failRetries_(0),
+ connectTimeout_(ctimeout),
+ connStart_(0)
+{}
+
+Comm::ConnOpener::~ConnOpener()
+{
+ safe_free(host_);
+}
+
+bool
+Comm::ConnOpener::doneAll() const
+{
+ // is the conn_ to be opened still waiting?
+ if (conn_ != NULL) {
+ return false;
+ }
+
+ // is the callback still to be called?
+ if (callback_ != NULL) {
+ return false;
+ }
+
+ return AsyncJob::doneAll();
+}
+
+void
+Comm::ConnOpener::swanSong()
+{
+ // cancel any event watchers
+ // done here to get the "swanSong" mention in cancel debugging.
+ if (calls_.earlyAbort_ != NULL) {
+ calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
+ calls_.earlyAbort_ = NULL;
+ }
+ if (calls_.timeout_ != NULL) {
+ calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
+ calls_.timeout_ = NULL;
+ }
+
+ // recover what we can from the job
+ if (conn_ != NULL && conn_->isOpen()) {
+ // it never reached fully open, so abort the FD
+ conn_->close();
+ fd_table[conn_->fd].flags.open = 0;
+ // inform the caller
+ doneConnecting(COMM_ERR_CONNECT, 0);
+ }
+
+ AsyncJob::swanSong();
+}
+
+void
+Comm::ConnOpener::setHost(const char * new_host)
+{
+ // unset and erase if already set.
+ if (host_ != NULL)
+ safe_free(host_);
+
+ // set the new one if given.
+ if (new_host != NULL)
+ host_ = xstrdup(new_host);
+}
+
+const char *
+Comm::ConnOpener::getHost() const
+{
+ return host_;
+}
+
+/**
+ * Connection attempt are completed. One way or the other.
+ * Pass the results back to the external handler.
+ */
+void
+Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
+{
+ if (callback_ != NULL) {
+ typedef CommConnectCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback_);
+ params.conn = conn_;
+ params.flag = status;
+ params.xerrno = xerrno;
+ ScheduleCallHere(callback_);
+ callback_ = NULL;
+ }
+
+ /* ensure cleared local state, we are done. */
+ conn_ = NULL;
+}
+
+void
+Comm::ConnOpener::start()
+{
+ Must(conn_ != NULL);
+
+ /* get a socket open ready for connecting with */
+ if (!conn_->isOpen()) {
+#if USE_IPV6
+ /* outbound sockets have no need to be protocol agnostic. */
+ if (conn_->remote.IsIPv4()) {
+ conn_->local.SetIPv4();
+ }
+#endif
+ conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, host_);
+ if (!conn_->isOpen()) {
+ doneConnecting(COMM_ERR_CONNECT, 0);
+ return;
+ }
+
+ if (calls_.earlyAbort_ == NULL) {
+ typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+ calls_.earlyAbort_ = asyncCall(5, 4, "Comm::ConnOpener::earlyAbort",
+ Dialer(this, &Comm::ConnOpener::earlyAbort));
+ comm_add_close_handler(conn_->fd, calls_.earlyAbort_);
+ }
+
+ if (calls_.timeout_ == NULL) {
+ typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> Dialer;
+ calls_.timeout_ = asyncCall(5, 4, "Comm::ConnOpener::timeout",
+ Dialer(this, &Comm::ConnOpener::timeout));
+ debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+ commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_);
+ }
+
+ if (connStart_ == 0) {
+ connStart_ = squid_curtime;
+ }
+ }
+
+ typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+ calls_.connect_ = asyncCall(5, 4, "Comm::ConnOpener::connect",
+ Dialer(this, &Comm::ConnOpener::connect));
+ ScheduleCallHere(calls_.connect_);
+}
+
+/** Make an FD connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::connect(const CommConnectCbParams &unused)
+{
+ Must(conn_ != NULL);
+
+ totalTries_++;
+
+ switch (comm_connect_addr(conn_->fd, conn_->remote) ) {
+
+ case COMM_INPROGRESS:
+ // check for timeout FIRST.
+ if(squid_curtime - connStart_ > connectTimeout_) {
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
+ doneConnecting(COMM_TIMEOUT, errno);
+ return;
+ } else {
+ debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+ commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::ConnectRetry, this, 0);
+ }
+ break;
+
+ case COMM_OK:
+ debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
+
+ /*
+ * stats.conn_open is used to account for the number of
+ * connections that we have open to the peer, so we can limit
+ * based on the max-conn option. We need to increment here,
+ * even if the connection may fail.
+ */
+ if (conn_->getPeer())
+ conn_->getPeer()->stats.conn_open++;
+
+ lookupLocalAddress();
+
+ /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to
+ * indicate the state of a raw fd object being passed around.
+ * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
+ * when those are done comm_local_port can become one of our member functions to do the below.
+ */
+ fd_table[conn_->fd].flags.open = 1;
+ fd_table[conn_->fd].local_addr = conn_->local;
+
+ if (host_ != NULL)
+ ipcacheMarkGoodAddr(host_, conn_->remote);
+ doneConnecting(COMM_OK, 0);
+ break;
+
+ default:
+ debugs(5, 5, HERE << conn_ << ": * - try again");
+ failRetries_++;
+ if (host_ != NULL)
+ ipcacheMarkBadAddr(host_, conn_->remote);
+#if USE_ICMP
+ if (Config.onoff.test_reachability)
+ netdbDeleteAddrNetwork(conn_->remote);
+#endif
+
+ // check for timeout FIRST.
+ if(squid_curtime - connStart_ > connectTimeout_) {
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
+ doneConnecting(COMM_TIMEOUT, errno);
+ } else if (failRetries_ < Config.connect_retries) {
+ ScheduleCallHere(calls_.connect_);
+ } else {
+ // send ERROR back to the upper layer.
+ debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
+ doneConnecting(COMM_ERR_CONNECT, errno);
+ }
+ }
+}
+
+/**
+ * Lookup local-end address and port of the TCP link just opened.
+ * This ensure the connection local details are set correctly
+ */
+void
+Comm::ConnOpener::lookupLocalAddress()
+{
+ struct addrinfo *addr = NULL;
+ conn_->local.InitAddrInfo(addr);
+
+ if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
+ debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
+ conn_->local.FreeAddrInfo(addr);
+ return;
+ }
+
+ conn_->local = *addr;
+ conn_->local.FreeAddrInfo(addr);
+ debugs(5, 6, HERE << conn_);
+}
+
+/** Abort connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io)
+{
+ debugs(5, 3, HERE << io.conn);
+ doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+}
+
+/**
+ * Handles the case(s) when a partially setup connection gets timed out.
+ * NP: When commSetTimeout accepts generic CommCommonCbParams this can die.
+ */
+void
+Comm::ConnOpener::timeout(const CommTimeoutCbParams &unused)
+{
+ ScheduleCallHere(calls_.connect_);
+}
+
+/* Legacy Wrapper for the retry event after COMM_INPROGRESS
+ * TODO: As soon as comm IO accepts Async calls we can use a ConnOpener::connect call
+ */
+void
+Comm::ConnOpener::ConnectRetry(int fd, void *data)
+{
+ ConnOpener *cs = static_cast<Comm::ConnOpener *>(data);
+
+ // Ew. we are now outside the all AsyncJob protections.
+ // get back inside by scheduling another call...
+ typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> Dialer;
+ AsyncCall::Pointer call = asyncCall(5, 4, "Comm::ConnOpener::connect",
+ Dialer(cs, &Comm::ConnOpener::connect));
+ ScheduleCallHere(call);
+}
=== added file 'src/comm/ConnOpener.h'
--- src/comm/ConnOpener.h 1970-01-01 00:00:00 +0000
+++ src/comm/ConnOpener.h 2010-09-06 12:06:44 +0000
@@ -0,0 +1,72 @@
+#ifndef _SQUID_SRC_COMM_OPENERSTATEDATA_H
+#define _SQUID_SRC_COMM_OPENERSTATEDATA_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJob.h"
+#include "cbdata.h"
+#include "CommCalls.h"
+#include "comm/comm_err_t.h"
+#include "comm/forward.h"
+
+namespace Comm {
+
+/**
+ * Async-opener of a Comm connection.
+ */
+class ConnOpener : public AsyncJob
+{
+protected:
+ virtual void start();
+ virtual void swanSong();
+
+public:
+ virtual bool doneAll() const;
+
+ ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout);
+ ~ConnOpener();
+
+ void setHost(const char *); ///< set the hostname note for this connection
+ const char * getHost() const; ///< get the hostname noted for this connection
+
+private:
+ // Undefined because two openers cannot share a connection
+ ConnOpener(const ConnOpener &);
+ ConnOpener & operator =(const ConnOpener &c);
+
+ void connect(const CommConnectCbParams &unused);
+ void earlyAbort(const CommConnectCbParams &);
+ void timeout(const CommTimeoutCbParams &unused);
+ void doneConnecting(comm_err_t status, int xerrno);
+ static void ConnectRetry(int fd, void *data);
+ void lookupLocalAddress();
+
+private:
+ char *host_; ///< domain name we are trying to connect to.
+ Comm::ConnectionPointer conn_; ///< single connection currently being opened.
+ AsyncCall::Pointer callback_; ///< handler to be called on connection completion.
+
+ int totalTries_; ///< total number of connection attempts over all destinations so far.
+ int failRetries_; ///< number of retries current destination has been tried.
+
+ /**
+ * time at which to abandon the connection.
+ * the connection-done callback will be passed COMM_TIMEOUT
+ */
+ time_t connectTimeout_;
+
+ /// time at which this series of connection attempts was started.
+ time_t connStart_;
+
+ /// handles to calls which we may need to cancel.
+ struct Calls {
+ AsyncCall::Pointer connect_;
+ AsyncCall::Pointer earlyAbort_;
+ AsyncCall::Pointer timeout_;
+ } calls_;
+
+ CBDATA_CLASS2(ConnOpener);
+};
+
+}; // namespace Comm
+
+#endif /* _SQUID_SRC_COMM_CONNOPENER_H */
=== added file 'src/comm/Connection.cc'
--- src/comm/Connection.cc 1970-01-01 00:00:00 +0000
+++ src/comm/Connection.cc 2010-08-13 09:20:32 +0000
@@ -0,0 +1,77 @@
+#include "config.h"
+#include "cbdata.h"
+#include "comm.h"
+#include "comm/Connection.h"
+
+bool
+Comm::IsConnOpen(const Comm::ConnectionPointer &conn)
+{
+ return conn != NULL && conn->isOpen();
+}
+
+
+Comm::Connection::Connection() :
+ local(),
+ remote(),
+ peerType(HIER_NONE),
+ fd(-1),
+ tos(0),
+ flags(COMM_NONBLOCKING),
+ _peer(NULL)
+{}
+
+Comm::Connection::~Connection()
+{
+ close();
+ cbdataReferenceDone(_peer);
+}
+
+Comm::ConnectionPointer
+Comm::Connection::copyDetails() const
+{
+ ConnectionPointer c = new Comm::Connection;
+
+ c->local = local;
+ c->remote = remote;
+ c->peerType = peerType;
+ c->tos = tos;
+ c->flags = flags;
+
+ // ensure FD is not open in the new copy.
+ c->fd = -1;
+
+ // ensure we have a cbdata reference to _peer not a straight ptr copy.
+ c->_peer = cbdataReference(_peer);
+
+ return c;
+}
+
+void
+Comm::Connection::close()
+{
+ if (isOpen()) {
+ comm_close(fd);
+ fd = -1;
+ if (_peer)
+ _peer->stats.conn_open--;
+ }
+}
+
+void
+Comm::Connection::setPeer(peer *p)
+{
+ /* set to self. nothing to do. */
+ if (_peer == p)
+ return;
+
+ /* clear any previous ptr */
+ if (_peer) {
+ cbdataReferenceDone(_peer);
+ _peer = NULL;
+ }
+
+ /* set the new one (unless it is NULL */
+ if (p) {
+ _peer = cbdataReference(p);
+ }
+}
=== renamed file 'src/ConnectionDetail.h' => 'src/comm/Connection.h'
--- src/ConnectionDetail.h 2010-05-26 03:06:02 +0000
+++ src/comm/Connection.h 2010-08-17 07:45:20 +0000
@@ -1,5 +1,6 @@
/*
* DEBUG: section 05 Socket Functions
+ * AUTHOR: Amos Jeffries
* AUTHOR: Robert Collins
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
@@ -30,23 +31,138 @@
*
*
* Copyright (c) 2003, Robert Collins <[email protected]>
+ * Copyright (c) 2010, Amos Jeffries <[email protected]>
*/
#ifndef _SQUIDCONNECTIONDETAIL_H_
#define _SQUIDCONNECTIONDETAIL_H_
+#include "config.h"
+#include "comm/forward.h"
+#include "hier_code.h"
#include "ip/Address.h"
-
-class ConnectionDetail
+#include "RefCount.h"
+
+#if HAVE_IOSFWD
+#include <iosfwd>
+#endif
+#if HAVE_OSTREAM
+#include <ostream>
+#endif
+
+struct peer;
+
+namespace Comm {
+
+/* TODO: make these a struct of boolean flags members in the connection instead of a bitmap.
+ * we can't do that until all non-comm code uses Commm::Connection objects to create FD
+ * currently there is code still using comm_open() and comm_openex() synchronously!!
+ */
+#define COMM_UNSET 0x00
+#define COMM_NONBLOCKING 0x01
+#define COMM_NOCLOEXEC 0x02
+#define COMM_REUSEADDR 0x04
+#define COMM_TRANSPARENT 0x08
+#define COMM_DOBIND 0x10
+
+/**
+ * Store data about the physical and logical attributes of a connection.
+ *
+ * Some link state can be infered from the data, however this is not an
+ * object for state data. But a semantic equivalent for FD with easily
+ * accessible cached properties not requiring repeated complex lookups.
+ *
+ * While the properties may be changed, this is for teh purpose of creating
+ * potential connection descriptors which may be opened. Properties should
+ * be considered read-only outside of the Comm layer code once the connection
+ * is open.
+ *
+ * These objects must not be passed around directly,
+ * but a Comm::ConnectionPointer must be passed instead.
+ */
+class Connection : public RefCountable
{
-
-public:
-
- ConnectionDetail();
-
- Ip::Address me;
-
- Ip::Address peer;
+public:
+ /** standard empty connection creation */
+ Connection();
+
+ /** Clear the connection properties and close any open socket. */
+ ~Connection();
+
+ /** Copy an existing connections IP and properties.
+ * This excludes the FD. The new copy will be a closed connection.
+ */
+ ConnectionPointer copyDetails() const;
+
+ /** Close any open socket. */
+ void close();
+
+ /** determine whether this object describes an active connection or not. */
+ bool isOpen() const { return (fd >= 0); }
+
+ /** retrieve the peer pointer for use.
+ * The caller is responsible for all CBDATA operations regarding the
+ * used of the pointer returned.
+ */
+ peer * const getPeer() const { return _peer; }
+
+ /** alter the stored peer pointer.
+ * Perform appropriate CBDATA operations for locking the peer pointer
+ */
+ void setPeer(peer * p);
+
+private:
+ /** These objects may not be exactly duplicated. Use copyDetails() instead. */
+ Connection(const Connection &c);
+
+ /** These objects may not be exactly duplicated. Use copyDetails() instead. */
+ Connection & operator =(const Connection &c);
+
+public:
+ /** Address/Port for the Squid end of a TCP link. */
+ Ip::Address local;
+
+ /** Address for the Remote end of a TCP link. */
+ Ip::Address remote;
+
+ /** Hierarchy code for this connection link */
+ hier_code peerType;
+
+ /** Socket used by this connection. -1 if no socket has been opened. */
+ int fd;
+
+ /** Quality of Service TOS values currently sent on this connection */
+ int tos;
+
+ /** COMM flags set on this connection */
+ int flags;
+
+private:
+ /** cache_peer data object (if any) */
+ peer *_peer;
};
+}; // namespace Comm
+
+
+// NP: Order and namespace here is very important.
+// * The second define inlines the first.
+// * Stream inheritance overloading is searched in the global scope first.
+
+inline std::ostream &
+operator << (std::ostream &os, const Comm::Connection &conn)
+{
+ os << "FD " << conn.fd << " local=" << conn.local <<
+ " remote=" << conn.remote << " flags=" << conn.flags;
+ return os;
+}
+
+inline std::ostream &
+operator << (std::ostream &os, const Comm::ConnectionPointer &conn)
+{
+ if (conn != NULL)
+ os << *conn;
+ return os;
+}
+
#endif
=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am 2009-12-31 02:35:01 +0000
+++ src/comm/Makefile.am 2010-08-15 07:52:51 +0000
@@ -1,13 +1,22 @@
include $(top_srcdir)/src/Common.am
include $(top_srcdir)/src/TestHeaders.am
-noinst_LTLIBRARIES = libcomm-listener.la
+noinst_LTLIBRARIES = libcomm.la
-## Library holding listener comm socket handlers
-libcomm_listener_la_SOURCES= \
+## First group are listener comm socket handlers
+## Second group are outbound connection setup handlers
+## Third group are misc shared comm objects
+libcomm_la_SOURCES= \
AcceptLimiter.cc \
AcceptLimiter.h \
- ListenStateData.cc \
- ListenStateData.h \
- \
- comm_internal.h
+ ConnAcceptor.cc \
+ ConnAcceptor.h \
+ \
+ ConnOpener.cc \
+ ConnOpener.h \
+ \
+ Connection.cc \
+ Connection.h \
+ comm_err_t.h \
+ comm_internal.h \
+ forward.h
=== added file 'src/comm/comm_err_t.h'
--- src/comm/comm_err_t.h 1970-01-01 00:00:00 +0000
+++ src/comm/comm_err_t.h 2010-05-15 14:57:24 +0000
@@ -0,0 +1,21 @@
+#ifndef _SQUID_COMM_COMM_ERR_T_H
+#define _SQUID_COMM_COMM_ERR_T_H
+
+#include "config.h"
+
+typedef enum {
+ COMM_OK = 0,
+ COMM_ERROR = -1,
+ COMM_NOMESSAGE = -3,
+ COMM_TIMEOUT = -4,
+ COMM_SHUTDOWN = -5,
+ COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
+ COMM_INPROGRESS = -7,
+ COMM_ERR_CONNECT = -8,
+ COMM_ERR_DNS = -9,
+ COMM_ERR_CLOSING = -10,
+ COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+ COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
+} comm_err_t;
+
+#endif /* _SQUID_COMM_COMM_ERR_T_H */
=== added file 'src/comm/forward.h'
--- src/comm/forward.h 1970-01-01 00:00:00 +0000
+++ src/comm/forward.h 2010-07-24 05:23:58 +0000
@@ -0,0 +1,19 @@
+#ifndef _SQUID_COMM_FORWARD_H
+#define _SQUID_COMM_FORWARD_H
+
+#include "Array.h"
+#include "RefCount.h"
+
+namespace Comm {
+
+class Connection;
+
+typedef RefCount<Comm::Connection> ConnectionPointer;
+
+typedef Vector<Comm::ConnectionPointer> ConnectionList;
+
+bool IsConnOpen(const Comm::ConnectionPointer &conn);
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_FORWARD_H */