On 07/01/2011 12:07 PM, Tsantilas Christos wrote:
The above can be modified as follows:
- Always increase the "theBusyConns" counter inside
"ServiceRep::getConnection" method (even if we did not return a connection)
- (1)Define a new method "ServiceRep::noteConnectionFailed" which will
decrease the "theBusyConns" counter and will be called from Xaction when
the connection failed OR (2) move the "open-connection" logic in
ServiceRep, where the theBusyConns will be decreased on connection
failures. (The second maybe is not easy...)
- Decrease the "theBusyConns" counter inside ServiceRep::putConnection
The 4th version of the patch, which implements the above .
It still needs some testing, I am posting it here in the case someone
wants to test it.
=== modified file 'src/adaptation/icap/ServiceRep.cc'
--- src/adaptation/icap/ServiceRep.cc 2011-06-17 10:41:10 +0000
+++ src/adaptation/icap/ServiceRep.cc 2011-07-01 15:05:40 +0000
@@ -25,17 +25,19 @@
theBusyConns(0),
theAllWaiters(0),
connOverloadReported(false),
- theIdleConns("ICAP Service",NULL),
isSuspended(0), notifying(false),
updateScheduled(false),
wasAnnouncedUp(true), // do not announce an "up" service at startup
isDetached(false)
{
setMaxConnections();
+ theIdleConns = new IdleConnList("ICAP Service",NULL);
+ assert(theIdleConns);
}
Adaptation::Icap::ServiceRep::~ServiceRep()
{
+ delete theIdleConns;
Must(!theOptionsFetcher);
delete theOptions;
}
@@ -102,16 +104,14 @@
* In other words, (2) tells us to close one FD for each new one we open due to retriable.
*/
if (retriableXact)
- connection = theIdleConns.pop();
+ connection = theIdleConns->pop();
else
- theIdleConns.closeN(1);
+ theIdleConns->closeN(1);
- if (!(reused = Comm::IsConnOpen(connection)))
- connection = new Comm::Connection;
- else {
+ if ((reused = Comm::IsConnOpen(connection))) {
debugs(93,3, HERE << "reused pconn " << connection);
- ++theBusyConns;
}
+ ++theBusyConns;
return connection;
}
@@ -124,7 +124,7 @@
if (isReusable && excessConnections() == 0) {
debugs(93, 3, HERE << "pushing pconn" << comment);
commUnsetConnTimeout(conn);
- theIdleConns.push(conn);
+ theIdleConns->push(conn);
} else {
debugs(93, 3, HERE << "closing pconn" << comment);
// comm_close will clear timeout
@@ -144,6 +144,12 @@
fd_table[conn->fd].noteUse(NULL); // pconn re-use but not via PconnPool API
}
+void Adaptation::Icap::ServiceRep::noteConnectionFailed(const char *comment)
+{
+ debugs(93, 3, HERE << "Connection failed: " << comment);
+ --theBusyConns;
+}
+
void Adaptation::Icap::ServiceRep::setMaxConnections()
{
if (cfg().maxConn >= 0)
@@ -171,8 +177,8 @@
if (!available && !connOverloadReported) {
debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " <<
"exceeded for service " << cfg().uri << ". Open connections now: " <<
- theBusyConns + theIdleConns.count() << ", including " <<
- theIdleConns.count() << " idle persistent connections.");
+ theBusyConns + theIdleConns->count() << ", including " <<
+ theIdleConns->count() << " idle persistent connections.");
connOverloadReported = true;
}
@@ -191,7 +197,7 @@
// Waiters affect the number of needed connections but a needed
// connection may still be excessive from Max-Connections p.o.v.
// so we should not account for waiting transaction needs here.
- const int debt = theBusyConns + theIdleConns.count() - theMaxConnections;
+ const int debt = theBusyConns + theIdleConns->count() - theMaxConnections;
if (debt > 0)
return debt;
else
@@ -378,7 +384,7 @@
debugs(93,8, "ICAPServiceRep::callWhenAvailable");
Must(cb!=NULL);
Must(up());
- Must(!theIdleConns.count()); // or we should not be waiting
+ Must(!theIdleConns->count()); // or we should not be waiting
Client i;
i.service = Pointer(this);
@@ -561,10 +567,10 @@
const int excess = excessConnections();
// if we owe connections and have idle pconns, close the latter
// XXX: but ... idle pconn to *where*?
- if (excess && theIdleConns.count() > 0) {
- const int n = min(excess, theIdleConns.count());
+ if (excess && theIdleConns->count() > 0) {
+ const int n = min(excess, theIdleConns->count());
debugs(93,5, HERE << "closing " << n << " pconns to relief debt");
- theIdleConns.closeN(n);
+ theIdleConns->closeN(n);
}
scheduleNotification();
=== modified file 'src/adaptation/icap/ServiceRep.h'
--- src/adaptation/icap/ServiceRep.h 2011-06-17 06:04:05 +0000
+++ src/adaptation/icap/ServiceRep.h 2011-07-01 15:05:54 +0000
@@ -113,6 +113,7 @@
Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, const char *comment);
void noteConnectionUse(const Comm::ConnectionPointer &conn);
+ void noteConnectionFailed(const char *comment);
void noteFailure(); // called by transactions to report service failure
@@ -160,7 +161,7 @@
int theMaxConnections; ///< the maximum allowed connections to the service
// TODO: use a better type like the FadingCounter for connOverloadReported
mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections
- IdleConnList theIdleConns; ///< idle persistent connection pool
+ IdleConnList *theIdleConns; ///< idle persistent connection pool
FadingCounter theSessionFailures;
const char *isSuspended; // also stores suspension reason for debugging
=== modified file 'src/adaptation/icap/Xaction.cc'
--- src/adaptation/icap/Xaction.cc 2011-06-17 10:41:10 +0000
+++ src/adaptation/icap/Xaction.cc 2011-07-01 15:09:06 +0000
@@ -16,6 +16,7 @@
#include "pconn.h"
#include "HttpRequest.h"
#include "HttpReply.h"
+#include "ipcache.h"
#include "acl/FilledChecklist.h"
#include "icap_log.h"
#include "fde.h"
@@ -85,6 +86,13 @@
Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
}
+static void
+icapLookupDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
+{
+ Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
+ xa->dnsLookupDone(ia);
+}
+
// TODO: obey service-specific, OPTIONS-reported connection limit
void
Adaptation::Icap::Xaction::openConnection()
@@ -101,11 +109,6 @@
if (wasReused && Comm::IsConnOpen(connection)) {
// Set comm Close handler
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
- comm_add_close_handler(connection->fd, closer);
-
// fake the connect callback
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
@@ -124,23 +127,42 @@
// Attempt to open a new connection...
debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
- // TODO: find the IPs and attempt each one if this is a named service.
- connection->remote = s.cfg().host.termedBuf();
+ // Locate the Service IP(s) to open
+ ipcache_nbgethostbyname(s.cfg().host.termedBuf(), icapLookupDnsResults, this);
+}
+
+void
+Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
+{
+ Adaptation::Icap::ServiceRep &s = service();
+
+ if (ia == NULL) {
+ debugs(44, DBG_IMPORTANT, "ICAP: Unknown service host: " << s.cfg().host);
+
+#if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
+ dieOnConnectionFailure(); // throws
+#else // take a step back into protected Async call dialing.
+ // fake the connect callback
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
+ CbcPointer<Xaction> self(this);
+ Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
+ dialer.params.conn = connection;
+ dialer.params.flag = COMM_ERROR;
+ // fake other parameters by copying from the existing connection
+ connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
+ ScheduleCallHere(connector);
+#endif
+ return;
+ }
+
+ assert(ia->cur < ia->count);
+
+ connection = new Comm::Connection;
+ connection->remote = ia->in_addrs[ia->cur];
connection->remote.SetPort(s.cfg().port);
+ getOutgoingAddress(NULL, connection);
// TODO: service bypass status may differ from that of a transaction
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
-
- commSetTimeout(connection->fd, TheConfig.connect_timeout(
- service().cfg().bypass), timeoutCall);
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
- comm_add_close_handler(connection->fd, closer);
-
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
Comm::ConnOpener *cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
@@ -206,6 +228,12 @@
if (io.flag != COMM_OK)
dieOnConnectionFailure(); // throws
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+ TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+ commSetTimeout(io.conn->fd, TheConfig.connect_timeout(
+ service().cfg().bypass), timeoutCall);
+
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
@@ -221,6 +249,7 @@
{
debugs(93, 2, HERE << typeName <<
" failed to connect to " << service().cfg().uri);
+ service().noteConnectionFailed("failure");
detailError(ERR_DETAIL_ICAP_XACT_START);
throw TexcHere("cannot connect to the ICAP service");
}
@@ -268,7 +297,11 @@
theService->cfg().uri << status());
reuseConnection = false;
const bool whileConnecting = connector != NULL;
- closeConnection(); // so that late Comm callbacks do not disturb bypass
+ if (whileConnecting) {
+ assert(!haveConnection());
+ theService->noteConnectionFailed("timedout");
+ } else
+ closeConnection(); // so that late Comm callbacks do not disturb bypass
throw TexcHere(whileConnecting ?
"timed out while connecting to the ICAP service" :
"timed out while talking to the ICAP service");
=== modified file 'src/adaptation/icap/Xaction.h'
--- src/adaptation/icap/Xaction.h 2011-06-04 12:48:45 +0000
+++ src/adaptation/icap/Xaction.h 2011-06-29 10:27:11 +0000
@@ -41,6 +41,7 @@
#include "adaptation/Initiate.h"
#include "AccessLogEntry.h"
#include "HttpReply.h"
+#include "ipcache.h"
class CommConnectCbParams;
@@ -133,6 +134,7 @@
// custom exception handling and end-of-call checks
virtual void callException(const std::exception &e);
virtual void callEnd();
+ void dnsLookupDone(const ipcache_addrs *ia);
protected:
// logging
=== modified file 'src/comm.cc'
--- src/comm.cc 2011-06-04 12:48:45 +0000
+++ src/comm.cc 2011-06-30 16:48:29 +0000
@@ -1156,7 +1156,7 @@
commCallCloseHandlers(fd);
- if (F->pconn.uses)
+ if (F->pconn.uses && F->pconn.pool)
F->pconn.pool->noteUses(F->pconn.uses);
comm_empty_os_read_buffers(fd);