A fixed patch.
On 05/08/2011 06:46 AM, Amos Jeffries wrote:
On 27/04/11 23:40, Tsantilas Christos wrote:
This patch implements the "Service Overload Handling" feature as
described in the squid wiki page:
http://wiki.squid-cache.org/Features/ServiceOverload
This feature also exist as bug #2055 in the squid bugzilla:
http://bugs.squid-cache.org/show_bug.cgi?id=2055
More informations included in the patch preamble.
Regards,
Christos
Some relatively minor tweaks in the parse handling:
Adaptation::ServiceConfig::grokLong(
Adaptation::ServiceConfig::grokOnOverload(
Adaptation::ServiceConfig::grokBool(
- Please use DBG_CRITICAL instead of ", 0,". Or if it is not critical
bump to an appropriate alternative (parsing errors are usually
DBG_IMPORTANT).
- use "ERROR:", "FATAL:","WARNING:" as appropriate to the seriousness of
the config problem.
* For parsing problem debugs please just use "cfg_filename" instead of
"HERE << cfg_filename".
I fixed a little the ServiceConfig::grokLong and
ServiceConfig::grokOnOverload methods.
I will fix the ::grokBool method with a separate patch after
max-connections patch commited.
One *Major* problem:
Adaptation::Icap::ServiceRep::getConnection() is missing IP split-stack
support removed from Adaptation::Icap::Xaction::openConnection().
Agrr
I missed this one....
On systems where IPv6 sockets can be opened but the ICAP server is
IPv4-only (MacOSX, OpenBSD, Windows, manually disabled kernels) this
will fail to connect() the socket.
You will need to preserve the ipv6=on|off option behaviour. Like so:
Ip::Address anyAddr; // default: IP6_ANY_ADDR
...
// need a new connection
if (!reuse) {
if (!Ip::EnableIpv6)
anyAddr.SetIPv4();
else if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && !cfg().ipv6)
/* split-stack for now requires default IPv4-only socket */
anyAddr.SetIPv4();
We are assuming that Address::SetIPv4 will always return true for
ANY_ADDR...
Maybe we can use
if (!Ip::EnableIpv6 || Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK &&
!cfg().ipv6))
But I let it as you suggested just for the "split-stact for now..." comment
connection = comm_open(SOCK_STREAM, 0, anyAddr, COMM_NONBLOCKING,
cfg().uri.termedBuf());
}
...
NP: the failover logics in comm_openex() are for handling sockets with
tcp_outgoing_addr pre-known. *_ANY_ADDR will never fail based on family
and so will never failover.
Amos
This patch implements the phase 1 of the ICAP Max-Connections feature as it is
described in squid wiki:
http://wiki.squid-cache.org/Features/ServiceOverload
The behaviour of the patch can be configured using on_overload and max_conn
options of the icap_service configuration parameter. Squid can be configured
to do one of the following:
- Block: send and HTTP error response to the subscriber
- Bypass: ignore the "over-connected" ICAP service
- Wait: wait (in a FIFO queue) for an ICAP connection slot
- Force: proceed, ignoring the Max-Connections limit
Squid warns the first time the service become overloaded
For more information please visit the feature wiki page given above.
Technical informations:
The patch starts count a connections to the ICAP server as active when the
ModXact class receives an FD even if the fd is not really connected to the
server yet, and decrease the active connections to the server when the ModXact
object releases its fd connection.
If the Max-Connection limit is reached squid puts the request to a waiters list.
When one or more connections released squid schedules one or more waiters for
execution and remove them from waiters list.
To handle cases where a waiter gone/canceled before its execution the custom
dialer ConnWaiterDialer used.
The Options connections counted as active connections but are not limited by
the Max-Connections limit. An Option request will be executed even if the
maximum connections number is reached.
=== modified file 'src/adaptation/Elements.h'
--- src/adaptation/Elements.h 2008-03-24 04:40:39 +0000
+++ src/adaptation/Elements.h 2011-04-07 14:40:53 +0000
@@ -8,6 +8,7 @@
typedef enum { methodNone, methodReqmod, methodRespmod, methodOptions } Method;
typedef enum { pointNone, pointPreCache, pointPostCache } VectPoint;
+typedef enum { srvBlock, srvBypass, srvWait, srvForce} SrvBehaviour;
extern const char *crlf;
extern const char *methodStr(Method); // TODO: make into a stream operator?
=== modified file 'src/adaptation/ServiceConfig.cc'
--- src/adaptation/ServiceConfig.cc 2011-03-08 23:56:22 +0000
+++ src/adaptation/ServiceConfig.cc 2011-05-09 08:34:21 +0000
@@ -9,7 +9,8 @@
Adaptation::ServiceConfig::ServiceConfig():
port(-1), method(methodNone), point(pointNone),
- bypass(false), routing(false), ipv6(false)
+ bypass(false), maxConn(-1), onOverload(srvWait),
+ routing(false), ipv6(false)
{}
const char *
@@ -70,6 +71,7 @@
// handle optional service name=value parameters
const char *lastOption = NULL;
bool grokkedUri = false;
+ bool onOverloadSet = false;
while (char *option = strtok(NULL, w_space)) {
if (strcmp(option, "0") == 0) { // backward compatibility
bypass = false;
@@ -91,9 +93,9 @@
// TODO: warn if option is set twice?
bool grokked = false;
- if (strcmp(name, "bypass") == 0)
+ if (strcmp(name, "bypass") == 0) {
grokked = grokBool(bypass, name, value);
- else if (strcmp(name, "routing") == 0)
+ } else if (strcmp(name, "routing") == 0)
grokked = grokBool(routing, name, value);
else if (strcmp(name, "uri") == 0)
grokked = grokkedUri = grokUri(value);
@@ -101,6 +103,11 @@
grokked = grokBool(ipv6, name, value);
if (grokked && ipv6 && !Ip::EnableIpv6)
debugs(3, DBG_IMPORTANT, "WARNING: IPv6 is disabled. ICAP service option ignored.");
+ } else if (strcmp(name, "max-conn") == 0)
+ grokked = grokLong(maxConn, name, value);
+ else if (strcmp(name, "on-overload") == 0) {
+ grokked = grokOnOverload(onOverload, value);
+ onOverloadSet = true;
} else
grokked = grokExtension(name, value);
@@ -108,6 +115,10 @@
return false;
}
+ // set default on-overload value if needed
+ if (!onOverloadSet)
+ onOverload = bypass ? srvBypass : srvWait;
+
// what is left must be the service URI
if (!grokkedUri && !grokUri(lastOption))
return false;
@@ -247,6 +258,41 @@
}
bool
+Adaptation::ServiceConfig::grokLong(long &var, const char *name, const char *value)
+{
+ char *bad = NULL;
+ const long p = strtol(value, &bad, 0);
+ if (p < 0 || bad == value) {
+ debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+ config_lineno << ": " << "wrong value for " << name << "; " <<
+ "a non-negative integer expected but got: " << value);
+ return false;
+ }
+ var = p;
+ return true;
+}
+
+bool
+Adaptation::ServiceConfig::grokOnOverload(SrvBehaviour &var, const char *value)
+{
+ if (strcmp(value, "block") == 0)
+ var = srvBlock;
+ else if (strcmp(value, "bypass") == 0)
+ var = srvBypass;
+ else if (strcmp(value, "wait") == 0)
+ var = srvWait;
+ else if (strcmp(value, "force") == 0)
+ var = srvForce;
+ else {
+ debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+ config_lineno << ": " << "wrong value for on-overload; " <<
+ "'block', 'bypass', 'wait' or 'force' expected but got: " << value);
+ return false;
+ }
+ return true;
+}
+
+bool
Adaptation::ServiceConfig::grokExtension(const char *name, const char *value)
{
// we do not accept extensions by default
=== modified file 'src/adaptation/ServiceConfig.h'
--- src/adaptation/ServiceConfig.h 2010-12-18 00:31:53 +0000
+++ src/adaptation/ServiceConfig.h 2011-04-27 10:24:19 +0000
@@ -32,6 +32,10 @@
Method method; // what is being adapted (REQMOD vs RESPMOD)
VectPoint point; // where the adaptation happens (pre- or post-cache)
bool bypass;
+
+ // options
+ long maxConn; ///< maximum number of concurrent service transactions
+ SrvBehaviour onOverload; ///< how to handle Max-Connections feature
bool routing; ///< whether this service may determine the next service(s)
bool ipv6; ///< whether this service uses IPv6 transport (default IPv4)
@@ -42,6 +46,9 @@
/// interpret parsed values
bool grokBool(bool &var, const char *name, const char *value);
bool grokUri(const char *value);
+ bool grokLong(long &var, const char *name, const char *value);
+ /// handle on-overload configuration option
+ bool grokOnOverload(SrvBehaviour &var, const char *value);
/// handle name=value configuration option with name unknown to Squid
virtual bool grokExtension(const char *name, const char *value);
};
=== modified file 'src/adaptation/icap/ModXact.cc'
--- src/adaptation/icap/ModXact.cc 2011-05-02 01:14:30 +0000
+++ src/adaptation/icap/ModXact.cc 2011-05-09 08:54:55 +0000
@@ -86,8 +86,8 @@
canStartBypass = service().cfg().bypass;
// it is an ICAP violation to send request to a service w/o known OPTIONS
-
- if (service().up())
+ // and the service may is too busy for us: honor Max-Connections and such
+ if (service().up() && service().availableForNew())
startWriting();
else
waitForService();
@@ -95,13 +95,44 @@
void Adaptation::Icap::ModXact::waitForService()
{
+ const char *comment;
Must(!state.serviceWaiting);
- debugs(93, 7, HERE << "will wait for the ICAP service" << status());
- typedef NullaryMemFunT<ModXact> Dialer;
- AsyncCall::Pointer call = JobCallback(93,5,
- Dialer, this, Adaptation::Icap::ModXact::noteServiceReady);
- service().callWhenReady(call);
+
+ if (!service().up()) {
+ AsyncCall::Pointer call = JobCallback(93,5,
+ ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceReady);
+
+ service().callWhenReady(call);
+ comment = "to be up";
+ } else {
+ //The service is unavailable because of max-connection or other reason
+
+ if (service().cfg().onOverload != srvWait) {
+ // The service is overloaded, but waiting to be available prohibited by
+ // user configuration (onOverload is set to "block" or "bypass")
+ if (service().cfg().onOverload == srvBlock)
+ disableBypass("not available", true);
+ else //if (service().cfg().onOverload == srvBypass)
+ canStartBypass = true;
+
+ disableRetries();
+ disableRepeats("ICAP service is not available");
+
+ debugs(93, 7, HERE << "will not wait for the service to be available" <<
+ status());
+
+ throw TexcHere("ICAP service is not available");
+ }
+
+ AsyncCall::Pointer call = JobCallback(93,5,
+ ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceAvailable);
+ service().callWhenAvailable(call, state.waitedForService);
+ comment = "to be available";
+ }
+
+ debugs(93, 7, HERE << "will wait for the service " << comment << status());
state.serviceWaiting = true; // after callWhenReady() which may throw
+ state.waitedForService = true;
}
void Adaptation::Icap::ModXact::noteServiceReady()
@@ -109,13 +140,27 @@
Must(state.serviceWaiting);
state.serviceWaiting = false;
- if (service().up()) {
- startWriting();
- } else {
+ if (!service().up()) {
disableRetries();
disableRepeats("ICAP service is unusable");
throw TexcHere("ICAP service is unusable");
}
+
+ if (service().availableForOld())
+ startWriting();
+ else
+ waitForService();
+}
+
+void Adaptation::Icap::ModXact::noteServiceAvailable()
+{
+ Must(state.serviceWaiting);
+ state.serviceWaiting = false;
+
+ if (service().up() && service().availableForOld())
+ startWriting();
+ else
+ waitForService();
}
void Adaptation::Icap::ModXact::startWriting()
=== modified file 'src/adaptation/icap/ModXact.h'
--- src/adaptation/icap/ModXact.h 2010-10-21 08:13:41 +0000
+++ src/adaptation/icap/ModXact.h 2011-04-26 14:38:06 +0000
@@ -157,6 +157,7 @@
// service waiting
void noteServiceReady();
+ void noteServiceAvailable();
public:
InOut virgin;
@@ -303,6 +304,7 @@
bool allowedPostview206; // must handle 206 Partial Content outside preview
bool allowedPreview206; // must handle 206 Partial Content inside preview
bool readyForUob; ///< got a 206 response and expect a use-origin-body
+ bool waitedForService; ///< true if was queued at least once
// will not write anything [else] to the ICAP server connection
bool doneWriting() const { return writing == writingReallyDone; }
=== modified file 'src/adaptation/icap/Options.cc'
--- src/adaptation/icap/Options.cc 2010-10-13 00:14:42 +0000
+++ src/adaptation/icap/Options.cc 2011-04-26 14:31:14 +0000
@@ -98,6 +98,8 @@
}
cfgIntHeader(h, "Max-Connections", max_connections);
+ if (max_connections == 0)
+ debugs(93, DBG_IMPORTANT, "WARNING: Max-Connections is set to zero! ");
cfgIntHeader(h, "Options-TTL", theTTL);
=== modified file 'src/adaptation/icap/ServiceRep.cc'
--- src/adaptation/icap/ServiceRep.cc 2011-03-11 23:02:23 +0000
+++ src/adaptation/icap/ServiceRep.cc 2011-05-09 09:33:35 +0000
@@ -11,19 +11,27 @@
#include "adaptation/icap/ServiceRep.h"
#include "base/TextException.h"
#include "ConfigParser.h"
+#include "ip/tools.h"
#include "HttpReply.h"
#include "SquidTime.h"
+#include "fde.h"
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ServiceRep);
Adaptation::Icap::ServiceRep::ServiceRep(const ServiceConfigPointer &svcCfg):
AsyncJob("Adaptation::Icap::ServiceRep"), Adaptation::Service(svcCfg),
theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0),
+ theBusyConns(0),
+ theAllWaiters(0),
+ connOverloadReported(false),
+ theIdleConns("ICAP Service"),
isSuspended(0), notifying(false),
updateScheduled(false),
wasAnnouncedUp(true), // do not announce an "up" service at startup
isDetached(false)
-{}
+{
+ setMaxConnections();
+}
Adaptation::Icap::ServiceRep::~ServiceRep()
{
@@ -72,6 +80,157 @@
// should be configurable.
}
+// returns a persistent or brand new connection; negative int on failures
+int Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+{
+ Ip::Address client_addr;
+
+ int connection = theIdleConns.pop(cfg().host.termedBuf(), cfg().port, NULL, client_addr,
+ retriableXact);
+
+ reused = connection >= 0; // reused a persistent connection
+
+ if (!reused) { // need a new connection
+ Ip::Address outgoing; // default: IP6_ANY_ADDR
+ if (!Ip::EnableIpv6)
+ outgoing.SetIPv4();
+ else if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && !cfg().ipv6) {
+ /* split-stack for now requires default IPv4-only socket */
+ outgoing.SetIPv4();
+ }
+ connection = comm_open(SOCK_STREAM, 0, outgoing, COMM_NONBLOCKING, cfg().uri.termedBuf());
+ }
+
+ if (connection >= 0)
+ ++theBusyConns;
+
+ return connection;
+}
+
+// pools connection if it is reusable or closes it
+void Adaptation::Icap::ServiceRep::putConnection(int fd, bool isReusable, const char *comment)
+{
+ Must(fd >= 0);
+ // do not pool an idle connection if we owe connections
+ if (isReusable && excessConnections() == 0) {
+ debugs(93, 3, HERE << "pushing pconn" << comment);
+ commSetTimeout(fd, -1, NULL, NULL);
+ Ip::Address anyAddr;
+ theIdleConns.push(fd, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+ } else {
+ debugs(93, 3, HERE << "closing pconn" << comment);
+ // comm_close will clear timeout
+ comm_close(fd);
+ }
+
+ Must(theBusyConns > 0);
+ --theBusyConns;
+ // a connection slot released. Check if there are waiters....
+ busyCheckpoint();
+}
+
+// a wrapper to avoid exposing theIdleConns
+void Adaptation::Icap::ServiceRep::noteConnectionUse(int fd)
+{
+ Must(fd >= 0);
+ fd_table[fd].noteUse(&theIdleConns);
+}
+
+void Adaptation::Icap::ServiceRep::setMaxConnections()
+{
+ if (cfg().maxConn >= 0)
+ theMaxConnections = cfg().maxConn;
+ else if (theOptions && theOptions->max_connections >= 0)
+ theMaxConnections = theOptions->max_connections;
+ else {
+ theMaxConnections = -1;
+ return;
+ }
+
+ if (::Config.workers > 1 )
+ theMaxConnections /= ::Config.workers;
+}
+
+int Adaptation::Icap::ServiceRep::availableConnections() const
+{
+ if (theMaxConnections < 0)
+ return -1;
+
+ // we are available if we can open or reuse connections
+ // in other words, if we will not create debt
+ int available = max(0, theMaxConnections - theBusyConns);
+
+ if (!available && !connOverloadReported) {
+ debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " <<
+ "exceeded for service " << cfg().uri << ". Open connections now: " <<
+ theBusyConns + theIdleConns.count() << ", including " <<
+ theIdleConns.count() << " idle persistent connections.");
+ connOverloadReported = true;
+ }
+
+ if (cfg().onOverload == srvForce)
+ return -1;
+
+ return available;
+}
+
+// The number of connections which excess the Max-Connections limit
+int Adaptation::Icap::ServiceRep::excessConnections() const
+{
+ if (theMaxConnections < 0)
+ return 0;
+
+ // Waiters affect the number of needed connections but a needed
+ // connection may still be excessive from Max-Connections p.o.v.
+ // so we should not account for waiting transaction needs here.
+ const int debt = theBusyConns + theIdleConns.count() - theMaxConnections;
+ if (debt > 0)
+ return debt;
+ else
+ return 0;
+}
+
+void Adaptation::Icap::ServiceRep::noteGoneWaiter()
+{
+ theAllWaiters--;
+
+ // in case the notified transaction did not take the connection slot
+ busyCheckpoint();
+}
+
+// called when a connection slot may become available
+void Adaptation::Icap::ServiceRep::busyCheckpoint()
+{
+ if (theNotificationWaiters.empty()) // nobody is waiting for a slot
+ return;
+
+ int freed = 0;
+ int available = availableConnections();
+
+ if (available < 0) {
+ // It is possible to have waiters when no limit on connections exist in
+ // case of reconfigure or because new Options received.
+ // In this case, notify all waiting transactions.
+ freed = theNotificationWaiters.size();
+ } else {
+ // avoid notifying more waiters than there will be available slots
+ const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size();
+ freed = available - notifiedWaiters;
+ }
+
+ debugs(93,7, HERE << "Available connections: " << available <<
+ " freed slots: " << freed <<
+ " waiting in queue: " << theNotificationWaiters.size());
+
+ while (freed > 0 && !theNotificationWaiters.empty()) {
+ Client i = theNotificationWaiters.front();
+ theNotificationWaiters.pop_front();
+ ScheduleCallHere(i.callback);
+ i.callback = NULL;
+ --freed;
+ }
+}
+
void Adaptation::Icap::ServiceRep::suspend(const char *reason)
{
if (isSuspended) {
@@ -99,6 +258,25 @@
return !isSuspended && hasOptions();
}
+bool Adaptation::Icap::ServiceRep::availableForNew() const
+{
+ Must(up());
+ int available = availableConnections();
+ if (available < 0)
+ return true;
+ else
+ return (available - theAllWaiters > 0);
+}
+
+bool Adaptation::Icap::ServiceRep::availableForOld() const
+{
+ Must(up());
+
+ int available = availableConnections();
+ return (available != 0); // it is -1 (no limit) or has available slots
+}
+
+
bool Adaptation::Icap::ServiceRep::wantsUrl(const String &urlPath) const
{
Must(hasOptions());
@@ -187,6 +365,24 @@
notifying = false;
}
+void Adaptation::Icap::ServiceRep::callWhenAvailable(AsyncCall::Pointer &cb, bool priority)
+{
+ debugs(93,8, "ICAPServiceRep::callWhenAvailable");
+ Must(cb!=NULL);
+ Must(up());
+ Must(!theIdleConns.count()); // or we should not be waiting
+
+ Client i;
+ i.service = Pointer(this);
+ i.callback = cb;
+ if (priority)
+ theNotificationWaiters.push_front(i);
+ else
+ theNotificationWaiters.push_back(i);
+
+ busyCheckpoint();
+}
+
void Adaptation::Icap::ServiceRep::callWhenReady(AsyncCall::Pointer &cb)
{
Must(cb!=NULL);
@@ -351,6 +547,17 @@
debugs(93,3, HERE << "got new options and is now " << status());
scheduleUpdate(optionsFetchTime());
+
+ setMaxConnections();
+ const int excess = excessConnections();
+ // if we owe connections and have idle pconns, close the latter
+ if (excess && theIdleConns.count() > 0) {
+ const int n = min(excess, theIdleConns.count());
+ debugs(93,5, HERE << "closing " << n << " pconns to relief debt");
+ Ip::Address anyAddr;
+ theIdleConns.closeN(n, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+ }
+
scheduleNotification();
}
@@ -486,3 +693,22 @@
{
return isDetached;
}
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const CbcPointer<ModXact> &xact,
+ Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler):
+ Parent(xact, aHandler)
+{
+ theService = &xact->service();
+ theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter): Parent(aConnWaiter)
+{
+ theService = aConnWaiter.theService;
+ theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::~ConnWaiterDialer()
+{
+ theService->noteGoneWaiter();
+}
=== modified file 'src/adaptation/icap/ServiceRep.h'
--- src/adaptation/icap/ServiceRep.h 2011-03-08 23:56:22 +0000
+++ src/adaptation/icap/ServiceRep.h 2011-04-27 11:20:11 +0000
@@ -40,7 +40,10 @@
#include "adaptation/forward.h"
#include "adaptation/Initiator.h"
#include "adaptation/icap/Elements.h"
-
+#include "base/AsyncJobCalls.h"
+#include "comm.h"
+#include "pconn.h"
+#include <deque>
namespace Adaptation
{
@@ -94,9 +97,12 @@
virtual bool probed() const; // see comments above
virtual bool up() const; // see comments above
+ bool availableForNew() const; ///< a new transaction may start communicating with the service
+ bool availableForOld() const; ///< a transaction notified about connection slot availability may start communicating with the service
virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause);
+ void callWhenAvailable(AsyncCall::Pointer &cb, bool priority = false);
void callWhenReady(AsyncCall::Pointer &cb);
// the methods below can only be called on an up() service
@@ -104,9 +110,16 @@
bool wantsPreview(const String &urlPath, size_t &wantedSize) const;
bool allows204() const;
bool allows206() const;
+ int getConnection(bool isRetriable, bool &isReused);
+ void putConnection(int fd, bool isReusable, const char *comment);
+ void noteConnectionUse(int fd);
void noteFailure(); // called by transactions to report service failure
+ void noteNewWaiter() {theAllWaiters++;} ///< New xaction waiting for service to be up or available
+ void noteGoneWaiter(); ///< An xaction is not waiting any more for service to be available
+ bool existWaiters() const {return (theAllWaiters > 0);} ///< if there are xactions waiting for the service to be available
+
//AsyncJob virtual methods
virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;}
virtual void callException(const std::exception &e);
@@ -130,12 +143,25 @@
};
typedef Vector<Client> Clients;
+ // TODO: rename to theUpWaiters
Clients theClients; // all clients waiting for a call back
Options *theOptions;
CbcPointer<Adaptation::Initiate> theOptionsFetcher; // pending ICAP OPTIONS transaction
time_t theLastUpdate; // time the options were last updated
+ /// FIFO queue of xactions waiting for a connection slot and not yet notified
+ /// about it; xaction is removed when notification is scheduled
+ std::deque<Client> theNotificationWaiters;
+ int theBusyConns; ///< number of connections given to active transactions
+ /// number of xactions waiting for a connection slot (notified and not)
+ /// the number is decreased after the xaction receives notification
+ int theAllWaiters;
+ int theMaxConnections; ///< the maximum allowed connections to the service
+ // TODO: use a better type like the FadingCounter for connOverloadReported
+ mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections
+ PconnPool theIdleConns; ///< idle persistent connection pool
+
FadingCounter theSessionFailures;
const char *isSuspended; // also stores suspension reason for debugging
@@ -162,6 +188,21 @@
void announceStatusChange(const char *downPhrase, bool important) const;
+ /// Set the maximum allowed connections for the service
+ void setMaxConnections();
+ /// The number of connections which excess the Max-Connections limit
+ int excessConnections() const;
+ /**
+ * The available connections slots to the ICAP server
+ \return the available slots, or -1 if there is no limit on allowed connections
+ */
+ int availableConnections() const;
+ /**
+ * If there are xactions waiting for the service to be available, notify
+ * as many xactions as the available connections slots.
+ */
+ void busyCheckpoint();
+
const char *status() const;
mutable bool wasAnnouncedUp; // prevent sequential same-state announcements
@@ -169,6 +210,18 @@
CBDATA_CLASS2(ServiceRep);
};
+class ModXact;
+/// Custom dialer to call Service::noteNewWaiter and noteGoneWaiter
+/// to maintain Service idea of waiting and being-notified transactions.
+class ConnWaiterDialer: public NullaryMemFunT<ModXact>
+{
+public:
+ typedef NullaryMemFunT<ModXact> Parent;
+ ServiceRep::Pointer theService;
+ ConnWaiterDialer(const CbcPointer<ModXact> &xact, Parent::Method aHandler);
+ ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter);
+ ~ConnWaiterDialer();
+};
} // namespace Icap
} // namespace Adaptation
=== modified file 'src/adaptation/icap/Xaction.cc'
--- src/adaptation/icap/Xaction.cc 2011-04-07 12:42:02 +0000
+++ src/adaptation/icap/Xaction.cc 2011-05-09 08:41:11 +0000
@@ -21,8 +21,6 @@
#include "SquidTime.h"
#include "err_detail_type.h"
-static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
-
//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
@@ -90,19 +88,24 @@
// TODO: obey service-specific, OPTIONS-reported connection limit
void Adaptation::Icap::Xaction::openConnection()
{
- Ip::Address client_addr;
-
Must(connection < 0);
- const Adaptation::Service &s = service();
+ Adaptation::Icap::ServiceRep &s = service();
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
- // TODO: check whether NULL domain is appropriate here
- connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
- if (connection >= 0) {
- debugs(93,3, HERE << "reused pconn FD " << connection);
+ bool wasReused = false;
+ connection = s.getConnection(isRetriable, wasReused);
+ if (connection < 0)
+ dieOnConnectionFailure(); // throws
+
+ if (wasReused) {
+ // Set comm Close handler
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(connection, closer);
// fake the connect callback
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
@@ -119,39 +122,25 @@
disableRetries(); // we only retry pconn failures
- Ip::Address outgoing;
- if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
- debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
- dieOnConnectionFailure(); // throws
- }
- /* split-stack for now requires default IPv4-only socket */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
- outgoing.SetIPv4();
- }
-
- connection = comm_open(SOCK_STREAM, 0, outgoing,
- COMM_NONBLOCKING, s.cfg().uri.termedBuf());
-
- if (connection < 0)
- dieOnConnectionFailure(); // throws
-
- debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
+
+ debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
- TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
+ AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+ TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+
commSetTimeout(connection, TheConfig.connect_timeout(
service().cfg().bypass), timeoutCall);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = JobCallback(93, 5,
- CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
comm_add_close_handler(connection, closer);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = JobCallback(93,3,
- ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+ connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
+ ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
}
@@ -186,21 +175,11 @@
reuseConnection = false;
}
- if (reuseConnection) {
- Ip::Address client_addr;
- //status() adds leading spaces.
- debugs(93,3, HERE << "pushing pconn" << status());
- AsyncCall::Pointer call = NULL;
- commSetTimeout(connection, -1, call);
- icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
- theService->cfg().port, NULL, client_addr);
+ if (reuseConnection)
disableRetries();
- } else {
- //status() adds leading spaces.
- debugs(93,3, HERE << "closing pconn" << status());
- // comm_close will clear timeout
- comm_close(connection);
- }
+
+ Adaptation::Icap::ServiceRep &s = service();
+ s.putConnection(connection, reuseConnection, status());
writer = NULL;
reader = NULL;
@@ -218,7 +197,7 @@
if (io.flag != COMM_OK)
dieOnConnectionFailure(); // throws
- fd_table[connection].noteUse(icapPconnPool);
+ service().noteConnectionUse(connection);
handleCommConnected();
}
=== modified file 'src/adaptation/icap/Xaction.h'
--- src/adaptation/icap/Xaction.h 2010-10-21 08:13:41 +0000
+++ src/adaptation/icap/Xaction.h 2011-04-26 14:31:14 +0000
@@ -138,6 +138,7 @@
void setOutcome(const XactOutcome &xo);
virtual void finalizeLogInfo();
+public:
ServiceRep &service();
private:
=== modified file 'src/cf.data.pre'
--- src/cf.data.pre 2011-05-03 03:01:59 +0000
+++ src/cf.data.pre 2011-05-09 08:54:55 +0000
@@ -6548,6 +6548,26 @@
is to use IPv4-only connections. When set to 'on' this option will
make Squid use IPv6-only connections to contact this ICAP service.
+ on-overload=block|bypass|wait|force
+ If the service Max-Connections limit has been reached, do
+ one of the following for each new ICAP transaction:
+ * block: send an HTTP error response to the client
+ * bypass: ignore the "over-connected" ICAP service
+ * wait: wait (in a FIFO queue) for an ICAP connection slot
+ * force: proceed, ignoring the Max-Connections limit
+
+ In SMP mode with N workers, each worker assumes the service
+ connection limit is Max-Connections/N, even though not all
+ workers may use a given service.
+
+ The default value is "bypass" if service is bypassable,
+ otherwise it is set to "wait".
+
+
+ max-conn=number
+ Use the given number as the Max-Connections limit, regardless
+ of the Max-Connections value given by the service, if any.
+
Older icap_service format without optional named parameters is
deprecated but supported for backward compatibility.
=== modified file 'src/comm.cc'
--- src/comm.cc 2011-01-28 07:58:53 +0000
+++ src/comm.cc 2011-03-23 15:55:49 +0000
@@ -1490,7 +1490,7 @@
commCallCloseHandlers(fd);
if (F->pconn.uses)
- F->pconn.pool->count(F->pconn.uses);
+ F->pconn.pool->noteUses(F->pconn.uses);
comm_empty_os_read_buffers(fd);
=== modified file 'src/pconn.cc'
--- src/pconn.cc 2010-12-14 14:01:14 +0000
+++ src/pconn.cc 2011-04-26 14:38:06 +0000
@@ -94,6 +94,9 @@
for (; index < nfds - 1; index++)
fds[index] = fds[index + 1];
+ if (parent)
+ parent->noteConnectionRemoved();
+
if (--nfds == 0) {
debugs(48, 3, "IdleConnList::removeFD: deleting " << hashKeyStr(&hash));
delete this;
@@ -123,6 +126,9 @@
xfree(old);
}
+ if (parent)
+ parent->noteConnectionAdded();
+
fds[nfds++] = fd;
comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this);
commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this);
@@ -230,7 +236,8 @@
/* ========== PconnPool PUBLIC FUNCTIONS ============================================ */
-PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr)
+PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr),
+ theCount(0)
{
int i;
table = hash_create((HASHCMP *) strcmp, 229, hash_string);
@@ -291,6 +298,7 @@
* We close available persistent connection if the caller transaction is not
* retriable to avoid having a growing number of open connections when many
* transactions create persistent connections but are not retriable.
+ * PconnPool::closeN() relies on that behavior as well.
*/
int
PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool isRetriable)
@@ -321,13 +329,23 @@
}
void
-PconnPool::unlinkList(IdleConnList *list) const
-{
+PconnPool::closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address)
+{
+ // TODO: optimize: we can probably do hash_lookup just once
+ for (int i = 0; i < n; ++i)
+ pop(host, port, domain, client_address, false); // may fail!
+}
+
+void
+PconnPool::unlinkList(IdleConnList *list)
+{
+ theCount -= list->count();
+ assert(theCount >= 0);
hash_remove_link(table, &list->hash);
}
void
-PconnPool::count(int uses)
+PconnPool::noteUses(int uses)
{
if (uses >= PCONN_HIST_SZ)
uses = PCONN_HIST_SZ - 1;
=== modified file 'src/pconn.h'
--- src/pconn.h 2010-10-07 06:34:34 +0000
+++ src/pconn.h 2011-04-26 14:38:06 +0000
@@ -32,7 +32,6 @@
public:
IdleConnList(const char *key, PconnPool *parent);
~IdleConnList();
- int numIdle() { return nfds; }
int findFDIndex(int fd); ///< search from the end of array
void removeFD(int fd);
@@ -40,6 +39,8 @@
int findUseableFD(); ///< find first from the end not pending read fd.
void clearHandlers(int fd);
+ int count() const { return nfds; }
+
private:
static IOCB read;
static PF timeout;
@@ -82,10 +83,14 @@
void moduleInit();
void push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address);
int pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool retriable);
- void count(int uses);
+ void noteUses(int uses);
void dumpHist(StoreEntry *e);
void dumpHash(StoreEntry *e);
- void unlinkList(IdleConnList *list) const;
+ void unlinkList(IdleConnList *list);
+ void closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address);
+ int count() const { return theCount; }
+ void noteConnectionAdded() { ++theCount; }
+ void noteConnectionRemoved() { assert(theCount > 0); --theCount; }
private:
@@ -94,7 +99,7 @@
int hist[PCONN_HIST_SZ];
hash_table *table;
const char *descr;
-
+ int theCount; ///< the number of pooled connections
};