On 22.01.2017 22:52, Amos Jeffries wrote:
> as I understood it the existing delay pools design is that multiple
> pools can apply to traffic. In which case on each write() attempt the
> bucket with smallest available amount determins the write size and all
> buckets have the actually consumed amount removed.
I assume that you are mixing up "pools" and "buckets". A pool may have
several "cascading" buckets. The existing server pools have several
buckets for classes > 1. We can configure several server delay pools,
but only one first 'matched' pool is assigned to the transaction. Then
the "smallest" bucket of the matched pool determines the final speed
limiting (as you correctly noted). Following this logic, old "client"
and new "response" pools should be independent (as if they would
represent different client pool classes), i.e., finally we
should select only one "first matched" client-side delay pool. Within
this selected pool, we select bucket with the smallest level (individual
or aggregate) similarly as we do for server pools of classes 2
or 3). However we do not do this selection if old client pool matched,
because it has the only one bucket.
> ClientDelayConfig::parsePoolCount() please use emplace_back instead of
> push_back.
AFAIK, emplace_back() would have a benefit over push_back if there was
an object itself (thus avoiding copying the object). In this case a
pointer is inserted so the benefit is negligible. I would prefer using
old push_back() here.
> is there a specific reason this has to be a list of raw-pointers
> instead of a list of objects like it was?
This change was a consequence of another fix: ClientDelayPool::access member
was not destroyed on shutdown causing "indirectly lost" Valgrind error
messages.
ClientDelayPool class does not follow copy semantics(and probably should
not)
due to acl_access pointer, requiring by stl vector. So I had to use
pointers instead.
However my latest checks with test-builds.sh showed another linker problems,
because SquidConfig got unnecessary dependency on ClientDelayConfig
destructor. Finally I had to re-work ClientDelayConfig module,
introducing new ClientDelayPools class (a storage for ClientDelayPool
objects) and making ClientDelayPool ref-countable.
> which brings up a design contradiction: "why are there required
> 'options'?
> so the answer is that they should not be required, the value -1 in
> pools just means "no limit" or "infinity".
> - the for-loop checking for missing options should be removed and use
> of the "none" magic word as value for these options should be supported.
I agree that we should not require using all response_delay_pool
options. For example, it would be useful to allow only "individual" or
"aggregate" options, e.g.:
#no aggregate limiting
response_delay_pool slowPool1 individual-restore=50000 \
individual-maximum=100000
#no individual limiting, split aggregate bandwidth among clients
response_delay_pool slowPool2 aggregate-restore=100000 \
aggregate-maximum=200000
No need for "none" world: just remove unneeded options (or specify -1
value). Using approach the existing server delay pools
adhere to ("none" for speed/maximum pair), we should not use "restore"
option without "maximum" for both "aggregate" and "individual" cases.
BTW, it is possible to specify "no limitation" at all for a pool
(i.e., no individual and aggregate limitation). The existing server
pools work similarly, allowing to specify "none" for all buckets
(without any warning message).
> The limitation configured with initial_fill_level= was previously
> configured through directive client_delay_initial_bucket_level.
"client_delay_initial_bucket_level" relates to only old client delay
pools. I don't think we should reuse it in the new response pools: one
may need to configure both pools with different initial level parameter.
Thanks for the detailed review. I tried to address all other remarks,
renamed parameters according to the suggested terminology,
merged with latest v5 r15027 and re-attached the patch.
Eduard.
Added response delay pools feature for Squid-to-client speed limiting.
The feature restricts Squid-to-client bandwidth only. It applies to
both cache hits and misses.
* Rationale *
This may be useful for specific response(s) bandwidth limiting.
There are situations when doing this is hardly possible
(or impossible) by means of netfilter/iptables operating with
TCP/IP packets and IP addresses information for filtering. In other
words, sometimes it is problematic to 'extract' a single response from
TCP/IP data flow at system level. For example, a single Squid-to-client
TCP connection can transmit multiple responses (persistent connections,
pipelining or HTTP/2 connection multiplexing) or be encrypted
(HTTPS proxy mode).
* Description *
When Squid starts delivering the final HTTP response to a client,
Squid checks response_delay_pool_access rules (supporting fast ACLs
only), in the order they were declared. The first rule with a
matching ACL wins. If (and only if) an "allow" rule won, Squid
assigns the response to the corresponding named delay pool.
If a response is assigned to a delay pool, the response becomes
subject to the configured bucket and aggregate bandwidth limits of
that pool, similar to the current "class 2" server-side delay pools,
but with a brand new, dedicated "individual" filled bucket assigned to
the matched response.
The new feature serves the same purpose as the existing client-side
pools: both features limit Squid-to-client bandwidth. Their common
interface was placed into a new base BandwidthBucket class. The
difference is that client-side pools do not aggregate clients and
always use one bucket per client IP. It is possible that a response
becomes a subject of both these pools. In such situations only matched
response delay pool will be used for Squid-to-client speed limiting.
* Limitations *
The accurate SMP support (with the aggregate bucket shared among
workers) is outside this patch scope. In SMP configurations,
Squid should automatically divide the aggregate_speed_limit and
max_aggregate_size values among the configured number of Squid
workers.
* Also: *
Fixed ClientDelayConfig which did not perform cleanup on
destruction, causing memory problems detected by Valgrind. It was not
possible to fix this with minimal changes because of linker problems
with SquidConfig while checking with test-builds.sh. So I had
to refactor ClientDelayConfig module, separating configuration code
(old ClientDelayConfig class) from configured data (a new
ClientDelayPools class) and minimizing dependencies with SquidConfig.
=== added file 'src/BandwidthBucket.cc'
--- src/BandwidthBucket.cc 1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.cc 2017-01-30 13:45:42 +0000
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "ClientInfo.h"
+#include "comm/Connection.h"
+#include "Debug.h"
+#include "fde.h"
+
+BandwidthBucket::BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit) :
+ bucketLevel( sizeLimit * (initialLevelPercent / 100.0)),
+ selectWaiting(false),
+ writeSpeedLimit(speed),
+ bucketSizeLimit(sizeLimit)
+{
+ getCurrentTime();
+ /* put current time to have something sensible here */
+ prevTime = current_dtime;
+}
+
+void
+BandwidthBucket::refillBucket()
+{
+ if (noLimit())
+ return;
+ // all these times are in seconds, with double precision
+ const double currTime = current_dtime;
+ const double timePassed = currTime - prevTime;
+
+ // Calculate allowance for the time passed. Use double to avoid
+ // accumulating rounding errors for small intervals. For example, always
+ // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+ const double gain = timePassed * writeSpeedLimit;
+
+ // to further combat error accumulation during micro updates,
+ // quit before updating time if we cannot add at least one byte
+ if (gain < 1.0)
+ return;
+
+ prevTime = currTime;
+
+ // for "first" connections, drain initial fat before refilling but keep
+ // updating prevTime to avoid bursts after the fat is gone
+ if (bucketLevel > bucketSizeLimit) {
+ debugs(77, 4, "not refilling while draining initial fat");
+ return;
+ }
+
+ bucketLevel += gain;
+
+ // obey quota limits
+ if (bucketLevel > bucketSizeLimit)
+ bucketLevel = bucketSizeLimit;
+}
+
+bool
+BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+ const int q = quota();
+ if (!q)
+ return false;
+ else if (q < 0)
+ return true;
+ const int nleft_corrected = min(nleft, q);
+ if (nleft != nleft_corrected) {
+ debugs(77, 5, state->conn << " writes only " <<
+ nleft_corrected << " out of " << nleft);
+ nleft = nleft_corrected;
+ }
+ return true;
+}
+
+void
+BandwidthBucket::reduceBucket(const int len)
+{
+ if (len <= 0 || noLimit())
+ return;
+ bucketLevel -= len;
+ if (bucketLevel < 0.0) {
+ debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen
+ bucketLevel = 0;
+ }
+}
+
+BandwidthBucket *
+BandwidthBucket::SelectBucket(fde *f)
+{
+ BandwidthBucket *bucket = f->writeQuotaHandler.getRaw();
+ if (!bucket) {
+ ClientInfo *clientInfo = f->clientInfo;
+ if (clientInfo && clientInfo->writeLimitingActive)
+ bucket = clientInfo;
+ }
+ return bucket;
+}
+
+#endif /* USE_DELAY_POOLS */
+
=== added file 'src/BandwidthBucket.h'
--- src/BandwidthBucket.h 1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.h 2017-01-30 13:45:42 +0000
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef BANDWIDTHBUCKET_H
+#define BANDWIDTHBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "comm/IoCallback.h"
+
+class fde;
+
+/// Base class for Squid-to-client bandwidth limiting
+class BandwidthBucket
+{
+public:
+ BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit);
+ virtual ~BandwidthBucket() {}
+
+ static BandwidthBucket *SelectBucket(fde *f);
+
+ /// \returns the number of bytes this bucket allows to write,
+ /// also considering aggregates, if any. Negative quota means
+ /// no limitations by this bucket.
+ virtual int quota() = 0;
+ /// Adjusts nleft to not exceed the current bucket quota value,
+ /// if needed.
+ virtual bool applyQuota(int &nleft, Comm::IoCallback *state);
+ /// Will plan another write call.
+ virtual void scheduleWrite(Comm::IoCallback *state) = 0;
+ /// Performs cleanup when the related file descriptor becomes closed.
+ virtual void onFdClosed() { selectWaiting = false; }
+ /// Decreases the bucket level.
+ virtual void reduceBucket(const int len);
+ /// Whether this bucket will not do bandwidth limiting.
+ bool noLimit() const { return writeSpeedLimit < 0; }
+
+protected:
+ /// Increases the bucket level with the writeSpeedLimit speed.
+ void refillBucket();
+
+public:
+ double bucketLevel; ///< how much can be written now
+ bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+
+protected:
+ double prevTime; ///< previous time when we checked
+ double writeSpeedLimit; ///< Write speed limit in bytes per second.
+ double bucketSizeLimit; ///< maximum bucket size
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
=== modified file 'src/ClientDelayConfig.cc'
--- src/ClientDelayConfig.cc 2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.cc 2017-01-30 13:45:43 +0000
@@ -1,103 +1,113 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#include "squid.h"
#include "acl/Acl.h"
#include "acl/Gadgets.h"
#include "ClientDelayConfig.h"
#include "ConfigParser.h"
#include "Parsing.h"
#include "Store.h"
+ClientDelayPool::~ClientDelayPool()
+{
+ if (access)
+ aclDestroyAccessList(&access);
+}
+
void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const
{
LOCAL_ARRAY(char, nom, 32);
snprintf(nom, 32, "client_delay_access %d", poolNumberMinusOne + 1);
dump_acl_access(entry, nom, access);
storeAppendPrintf(entry, "client_delay_parameters %d %d %" PRId64 "\n", poolNumberMinusOne + 1, rate,highwatermark);
storeAppendPrintf(entry, "\n");
}
+ClientDelayPools *
+ClientDelayPools::Instance()
+{
+ static ClientDelayPools pools;
+ return &pools;
+}
+
+ClientDelayPools::~ClientDelayPools()
+{
+ pools.clear();
+}
+
void
ClientDelayConfig::finalize()
{
- for (unsigned int i = 0; i < pools.size(); ++i) {
+ for (unsigned int i = 0; i < pools().size(); ++i) {
/* pools require explicit 'allow' to assign a client into them */
- if (!pools[i].access) {
- debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) <<
+ if (!pool(i).access) {
+ debugs(77, DBG_IMPORTANT, "WARNING: client_delay_pool #" << (i+1) <<
" has no client_delay_access configured. " <<
"No client will ever use it.");
}
}
}
-void ClientDelayConfig::freePoolCount()
-{
- pools.clear();
-}
-
void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
{
- if (pools.size()) {
- storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size());
- for (unsigned int i = 0; i < pools.size(); ++i)
- pools[i].dump(entry, i);
+ const auto &pools_ = ClientDelayPools::Instance()->pools;
+ if (pools_.size()) {
+ storeAppendPrintf(entry, "%s %d\n", name, static_cast<int>(pools_.size()));
+ for (unsigned int i = 0; i < pools_.size(); ++i)
+ pools_[i]->dump(entry, i);
}
}
+void
+ClientDelayConfig::freePools()
+{
+ pools().clear();
+}
+
void ClientDelayConfig::parsePoolCount()
{
- if (pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config");
- clean();
+ if (pools().size()) {
+ debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, " <<
+ "aborting all previous client_delay_pools config");
+ freePools();
}
unsigned short pools_;
ConfigParser::ParseUShort(&pools_);
- for (int i = 0; i < pools_; ++i) {
- pools.push_back(ClientDelayPool());
- }
+ for (int i = 0; i < pools_; ++i)
+ pools().push_back(new ClientDelayPool());
}
void ClientDelayConfig::parsePoolRates()
{
- unsigned short pool;
- ConfigParser::ParseUShort(&pool);
-
- if (pool < 1 || pool > pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
- return;
+ if (unsigned short poolId = parsePoolId()) {
+ --poolId;
+ pool(poolId).rate = GetInteger();
+ pool(poolId).highwatermark = GetInteger64();
}
-
- --pool;
-
- pools[pool].rate = GetInteger();
- pools[pool].highwatermark = GetInteger64();
}
void ClientDelayConfig::parsePoolAccess(ConfigParser &parser)
{
- unsigned short pool;
-
- ConfigParser::ParseUShort(&pool);
-
- if (pool < 1 || pool > pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
- return;
- }
-
- --pool;
- aclParseAccessLine("client_delay_access", parser, &pools[pool].access);
+ if (const unsigned short poolId = parsePoolId())
+ aclParseAccessLine("client_delay_access", parser, &(pool(poolId-1).access));
}
-void ClientDelayConfig::clean()
+unsigned short
+ClientDelayConfig::parsePoolId()
{
- for (unsigned int i = 0; i < pools.size(); ++i) {
- aclDestroyAccessList(&pools[i].access);
+ unsigned short poolId = 0;
+ ConfigParser::ParseUShort(&poolId);
+ if (poolId < 1 || poolId > pools().size()) {
+ debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " <<
+ poolId << " not in 1 .. " << pools().size());
+ return 0;
}
+ return poolId;
}
=== modified file 'src/ClientDelayConfig.h'
--- src/ClientDelayConfig.h 2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.h 2017-01-30 13:45:43 +0000
@@ -1,60 +1,83 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_CLIENTDELAYCONFIG_H
#define SQUID_CLIENTDELAYCONFIG_H
#include "acl/forward.h"
+#include "base/RefCount.h"
#include <vector>
class StoreEntry;
class ConfigParser;
/// \ingroup DelayPoolsAPI
/* represents one client write limiting delay 'pool' */
-class ClientDelayPool
+class ClientDelayPool : public RefCountable
{
public:
+ typedef RefCount<ClientDelayPool> Pointer;
+
ClientDelayPool()
- : access(NULL), rate(0), highwatermark(0) {}
+ : access(nullptr), rate(0), highwatermark(0) {}
+ ~ClientDelayPool();
+ ClientDelayPool(const ClientDelayPool &) = delete;
+ ClientDelayPool &operator=(const ClientDelayPool &) = delete;
+
void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const;
acl_access *access;
int rate;
int64_t highwatermark;
};
-typedef std::vector<ClientDelayPool> ClientDelayPools;
+class ClientDelayPools
+{
+public:
+ ClientDelayPools(const ClientDelayPools &) = delete;
+ ClientDelayPools &operator=(const ClientDelayPools &) = delete;
+ static ClientDelayPools *Instance();
+
+ std::vector<ClientDelayPool::Pointer> pools;
+private:
+ ClientDelayPools() {}
+ ~ClientDelayPools();
+};
/* represents configuration of client write limiting delay pools */
class ClientDelayConfig
{
public:
ClientDelayConfig()
: initial(50) {}
- void freePoolCount();
+ ClientDelayConfig(const ClientDelayConfig &) = delete;
+ ClientDelayConfig &operator=(const ClientDelayConfig &) = delete;
+
+ void freePools();
void dumpPoolCount(StoreEntry * entry, const char *name) const;
/* parsing of client_delay_pools - number of pools */
void parsePoolCount();
/* parsing of client_delay_parameters lines */
void parsePoolRates();
/* parsing client_delay_access lines */
void parsePoolAccess(ConfigParser &parser);
void finalize(); ///< checks pools configuration
/* initial bucket level, how fill bucket at startup */
unsigned short initial;
- ClientDelayPools pools;
+
private:
- void clean();
+ unsigned short parsePoolId();
+ std::vector<ClientDelayPool::Pointer> &pools() { return ClientDelayPools::Instance()->pools; }
+ ClientDelayPool &pool(const int i) { return *(ClientDelayPools::Instance()->pools.at(i)); }
};
#endif // SQUID_CLIENTDELAYCONFIG_H
=== modified file 'src/ClientInfo.h'
--- src/ClientInfo.h 2017-01-01 00:12:22 +0000
+++ src/ClientInfo.h 2017-01-30 13:45:43 +0000
@@ -1,113 +1,117 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID__SRC_CLIENTINFO_H
#define SQUID__SRC_CLIENTINFO_H
+#if USE_DELAY_POOLS
+#include "BandwidthBucket.h"
+#endif
#include "base/ByteCounter.h"
#include "cbdata.h"
#include "enums.h"
#include "hash.h"
#include "ip/Address.h"
#include "LogTags.h"
#include "mem/forward.h"
#include "typedefs.h"
#include <deque>
#if USE_DELAY_POOLS
class CommQuotaQueue;
#endif
-class ClientInfo
+class ClientInfo : public hash_link
+#if USE_DELAY_POOLS
+ , public BandwidthBucket
+#endif
{
MEMPROXY_CLASS(ClientInfo);
public:
explicit ClientInfo(const Ip::Address &);
~ClientInfo();
- hash_link hash; /* must be first */
-
Ip::Address addr;
struct Protocol {
Protocol() : n_requests(0) {
memset(result_hist, 0, sizeof(result_hist));
}
int result_hist[LOG_TYPE_MAX];
int n_requests;
ByteCounter kbytes_in;
ByteCounter kbytes_out;
ByteCounter hit_kbytes_out;
} Http, Icp;
struct Cutoff {
Cutoff() : time(0), n_req(0), n_denied(0) {}
time_t time;
int n_req;
int n_denied;
} cutoff;
int n_established; /* number of current established connections */
time_t last_seen;
#if USE_DELAY_POOLS
- double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
- double prevTime; ///< previous time when we checked
- double bucketSize; ///< how much can be written now
- double bucketSizeLimit; ///< maximum bucket size
bool writeLimitingActive; ///< Is write limiter active
bool firstTimeConnection;///< is this first time connection for this client
CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
int rationedQuota; ///< precomputed quota preserving fairness among clients
int rationedCount; ///< number of clients that will receive rationedQuota
- bool selectWaiting; ///< is between commSetSelect and commHandleWrite
bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
// all those functions access Comm fd_table and are defined in comm.cc
bool hasQueue() const; ///< whether any clients are waiting for write quota
bool hasQueue(const CommQuotaQueue*) const; ///< has a given queue
unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
int quotaPeekFd() const; ///< retuns the next fd reservation
unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
void quotaDequeue(); ///< pops queue head from queue
void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
- int quotaForDequed(); ///< allocate quota for a just dequeued client
- void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+ /* BandwidthBucket API */
+ virtual int quota() override; ///< allocate quota for a just dequeued client
+ virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override;
+ virtual void scheduleWrite(Comm::IoCallback *state) override;
+ virtual void onFdClosed() override;
+ virtual void reduceBucket(int len) override;
void quotaDumpQueue(); ///< dumps quota queue for debugging
/**
* Configure client write limiting (note:"client" here means - IP). It is called
* by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
* computed, using the configured maximum bucket vavlue and configured initial
* bucket value(50% by default).
*
* \param writeSpeedLimit is speed limit configured in config for this pool
* \param initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
* \param highWatermark is maximum bucket value
*/
void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
#endif /* USE_DELAY_POOLS */
};
#if USE_DELAY_POOLS
// a queue of Comm clients waiting for I/O quota controlled by delay pools
class CommQuotaQueue
{
CBDATA_CLASS(CommQuotaQueue);
public:
CommQuotaQueue(ClientInfo *info);
~CommQuotaQueue();
bool empty() const { return fds.empty(); }
size_t size() const { return fds.size(); }
int front() const { return fds.front(); }
=== modified file 'src/Makefile.am'
--- src/Makefile.am 2017-01-24 19:49:32 +0000
+++ src/Makefile.am 2017-01-30 13:45:53 +0000
@@ -61,81 +61,87 @@
snmp_core.cc \
snmp_agent.h \
snmp_agent.cc
if ENABLE_SNMP
SNMP_SOURCE = $(SNMP_ALL_SOURCE)
SUBDIRS += snmp
SNMP_LIBS = snmp/libsnmp.la $(SNMPLIB)
else
SNMP_SOURCE =
endif
DIST_SUBDIRS += snmp
if ENABLE_ADAPTATION
SUBDIRS += adaptation
endif
DIST_SUBDIRS += adaptation
if ENABLE_ESI
SUBDIRS += esi
ESI_LIBS = \
esi/libesi.la \
$(top_builddir)/lib/libTrie/libTrie.a \
$(XMLLIB) \
$(EXPATLIB)
else
ESI_LIBS =
endif
DIST_SUBDIRS += esi
DELAY_POOL_ALL_SOURCE = \
+ BandwidthBucket.cc \
+ BandwidthBucket.h \
CommonPool.h \
CompositePoolNode.h \
delay_pools.cc \
DelayId.cc \
DelayId.h \
DelayIdComposite.h \
DelayBucket.cc \
DelayBucket.h \
DelayConfig.cc \
DelayConfig.h \
DelayPool.cc \
DelayPool.h \
DelayPools.h \
DelaySpec.cc \
DelaySpec.h \
DelayTagged.cc \
DelayTagged.h \
DelayUser.cc \
DelayUser.h \
DelayVector.cc \
DelayVector.h \
+ MessageBucket.cc \
+ MessageBucket.h \
+ MessageDelayPools.h \
+ MessageDelayPools.cc \
NullDelayId.h \
ClientDelayConfig.cc \
ClientDelayConfig.h
if ENABLE_DELAY_POOLS
DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
else
DELAY_POOL_SOURCE =
endif
if ENABLE_XPROF_STATS
XPROF_STATS_SOURCE = ProfStats.cc
else
XPROF_STATS_SOURCE =
endif
if ENABLE_HTCP
HTCPSOURCE = htcp.cc htcp.h
endif
if ENABLE_LEAKFINDER
LEAKFINDERSOURCE = LeakFinder.cc
else
LEAKFINDERSOURCE =
endif
if ENABLE_UNLINKD
UNLINKDSOURCE = unlinkd.h unlinkd.cc
UNLINKD = unlinkd
else
=== added file 'src/MessageBucket.cc'
--- src/MessageBucket.cc 1970-01-01 00:00:00 +0000
+++ src/MessageBucket.cc 2017-01-30 13:45:54 +0000
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "comm/Connection.h"
+#include "DelayPools.h"
+#include "fde.h"
+#include "MessageBucket.h"
+
+MessageBucket::MessageBucket(const int speed, const int initialLevelPercent,
+ const double sizeLimit, MessageDelayPool::Pointer pool) :
+ BandwidthBucket(speed, initialLevelPercent, sizeLimit),
+ theAggregate(pool) {}
+
+int
+MessageBucket::quota()
+{
+ refillBucket();
+ theAggregate->refillBucket();
+ if (theAggregate->noLimit())
+ return bucketLevel;
+ else if (noLimit())
+ return theAggregate->level();
+ else
+ return min(bucketLevel, static_cast<double>(theAggregate->level()));
+}
+
+void
+MessageBucket::reduceBucket(int len)
+{
+ BandwidthBucket::reduceBucket(len);
+ theAggregate->bytesIn(len);
+}
+
+void
+MessageBucket::scheduleWrite(Comm::IoCallback *state)
+{
+ fde *F = &fd_table[state->conn->fd];
+ if (!F->writeQuotaHandler->selectWaiting) {
+ F->writeQuotaHandler->selectWaiting = true;
+ // message delay pools limit this write; see checkTimeouts()
+ SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0);
+ }
+}
+
+#endif /* USE_DELAY_POOLS */
+
=== added file 'src/MessageBucket.h'
--- src/MessageBucket.h 1970-01-01 00:00:00 +0000
+++ src/MessageBucket.h 2017-01-30 13:45:54 +0000
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEBUCKET_H
+#define MESSAGEBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "base/RefCount.h"
+#include "comm/forward.h"
+#include "MessageDelayPools.h"
+
+/// Limits Squid-to-client bandwidth for each matching response
+class MessageBucket : public RefCountable, public BandwidthBucket
+{
+ MEMPROXY_CLASS(MessageBucket);
+
+public:
+ typedef RefCount<MessageBucket> Pointer;
+
+ MessageBucket(const int speed, const int initialLevelPercent, const double sizeLimit, MessageDelayPool::Pointer pool);
+
+ /* BandwidthBucket API */
+ virtual int quota() override;
+ virtual void scheduleWrite(Comm::IoCallback *state) override;
+ virtual void reduceBucket(int len);
+
+private:
+ MessageDelayPool::Pointer theAggregate;
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
=== added file 'src/MessageDelayPools.cc'
--- src/MessageDelayPools.cc 1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.cc 2017-01-30 13:45:54 +0000
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "acl/Gadgets.h"
+#include "cache_cf.h"
+#include "ConfigParser.h"
+#include "DelaySpec.h"
+#include "event.h"
+#include "MessageBucket.h"
+#include "MessageDelayPools.h"
+#include "Parsing.h"
+#include "SquidTime.h"
+#include "Store.h"
+
+#include <algorithm>
+#include <map>
+
+MessageDelayPools::~MessageDelayPools()
+{
+ freePools();
+}
+
+MessageDelayPools *
+MessageDelayPools::Instance()
+{
+ static MessageDelayPools pools;
+ return &pools;
+}
+
+MessageDelayPool::Pointer
+MessageDelayPools::pool(const SBuf &name)
+{
+ auto it = std::find_if(pools.begin(), pools.end(),
+ [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; });
+ return it == pools.end() ? 0 : *it;
+}
+
+void
+MessageDelayPools::add(MessageDelayPool *p)
+{
+ const auto it = std::find_if(pools.begin(), pools.end(),
+ [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; });
+ if (it != pools.end()) {
+ debugs(3, DBG_CRITICAL, "WARNING: Ignoring duplicate " << p->poolName << " response delay pool");
+ return;
+ }
+ pools.push_back(p);
+}
+
+void
+MessageDelayPools::freePools()
+{
+ pools.clear();
+}
+
+MessageDelayPool::MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+ int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent):
+ access(0),
+ poolName(name),
+ individualRestore(bucketSpeed),
+ individualMaximum(bucketSize),
+ aggregateRestore(aggregateSpeed),
+ aggregateMaximum(aggregateSize),
+ initialBucketLevel(initialBucketPercent),
+ lastUpdate(squid_curtime)
+{
+ theBucket.level() = aggregateMaximum;
+}
+
+MessageDelayPool::~MessageDelayPool()
+{
+ if (access)
+ aclDestroyAccessList(&access);
+}
+
+void
+MessageDelayPool::refillBucket()
+{
+ if (noLimit())
+ return;
+ const int incr = squid_curtime - lastUpdate;
+ if (incr >= 1) {
+ lastUpdate = squid_curtime;
+ DelaySpec spec;
+ spec.restore_bps = aggregateRestore;
+ spec.max_bytes = aggregateMaximum;
+ theBucket.update(spec, incr);
+ }
+}
+
+void
+MessageDelayPool::dump(StoreEntry *entry) const
+{
+ SBuf name("response_delay_pool_access ");
+ name.append(poolName);
+ dump_acl_access(entry, name.c_str(), access);
+ storeAppendPrintf(entry, "response_delay_pool parameters %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %d\n",
+ individualRestore, individualMaximum, aggregateRestore, aggregateMaximum, initialBucketLevel);
+ storeAppendPrintf(entry, "\n");
+}
+
+MessageBucket::Pointer
+MessageDelayPool::createBucket()
+{
+ return new MessageBucket(individualRestore, initialBucketLevel, individualMaximum, this);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPool()
+{
+ static const SBuf bucketSpeedLimit("individual-restore");
+ static const SBuf maxBucketSize("individual-maximum");
+ static const SBuf aggregateSpeedLimit("aggregate-restore");
+ static const SBuf maxAggregateSize("aggregate-maximum");
+ static const SBuf initialBucketPercent("initial-bucket-level");
+
+ static std::map<SBuf, int64_t> params;
+ params[bucketSpeedLimit] = -1;
+ params[maxBucketSize] = -1;
+ params[aggregateSpeedLimit] = -1;
+ params[maxAggregateSize] = -1;
+ params[initialBucketPercent] = 50;
+
+ const SBuf name(ConfigParser::NextToken());
+ if (name.isEmpty()) {
+ debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool missing required \"name\" parameter.");
+ self_destruct();
+ }
+
+ char *key = nullptr;
+ char *value = nullptr;
+ while (ConfigParser::NextKvPair(key, value)) {
+ if (!value) {
+ debugs(3, DBG_CRITICAL, "FATAL: '" << key << "' option missing value");
+ self_destruct();
+ }
+ auto it = params.find(SBuf(key));
+ if (it == params.end()) {
+ debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool unknown option '" << key << "'");
+ self_destruct();
+ }
+ it->second = (it->first == initialBucketPercent) ? xatos(value) : xatoll(value, 10);
+ }
+
+ const char *fatalMsg = nullptr;
+ if ((params[bucketSpeedLimit] < 0) != (params[maxBucketSize] < 0))
+ fatalMsg = "'individual-restore' and 'individual-maximum'";
+ else if ((params[aggregateSpeedLimit] < 0) != (params[maxAggregateSize] < 0))
+ fatalMsg = "'aggregate-restore' and 'aggregate-maximum'";
+
+ if (fatalMsg) {
+ debugs(3, DBG_CRITICAL, "FATAL: must use " << fatalMsg << " options in conjunction");
+ self_destruct();
+ }
+
+ MessageDelayPool *pool = new MessageDelayPool(name,
+ params[bucketSpeedLimit],
+ params[maxBucketSize],
+ params[aggregateSpeedLimit],
+ params[maxAggregateSize],
+ static_cast<uint16_t>(params[initialBucketPercent])
+ );
+ MessageDelayPools::Instance()->add(pool);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPoolAccess() {
+ const char *token = ConfigParser::NextToken();
+ if (!token) {
+ debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing");
+ return;
+ }
+ MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token));
+ static ConfigParser parser;
+ if (pool)
+ aclParseAccessLine("response_delay_pool_access", parser, &pool->access);
+}
+
+void
+MessageDelayConfig::freePools()
+{
+ MessageDelayPools::Instance()->freePools();
+}
+
+void
+MessageDelayConfig::dumpResponseDelayPoolParameters(StoreEntry *entry, const char *name)
+{
+ auto &pools = MessageDelayPools::Instance()->pools;
+ for (auto pool: pools)
+ pool->dump(entry);
+}
+
+#endif
+
=== added file 'src/MessageDelayPools.h'
--- src/MessageDelayPools.h 1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.h 2017-01-30 13:45:54 +0000
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEDELAYPOOLS_H
+#define MESSAGEDELAYPOOLS_H
+
+#if USE_DELAY_POOLS
+
+#include "acl/Acl.h"
+#include "base/RefCount.h"
+#include "DelayBucket.h"
+#include "DelayPools.h"
+
+class MessageBucket;
+typedef RefCount<MessageBucket> MessageBucketPointer;
+
+/// \ingroup DelayPoolsAPI
+/// Represents one 'response' delay pool, creates individual response
+/// buckets and performes aggregate limiting for them
+class MessageDelayPool : public RefCountable
+{
+public:
+ typedef RefCount<MessageDelayPool> Pointer;
+
+ MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+ int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent);
+ ~MessageDelayPool();
+ MessageDelayPool(const MessageDelayPool &) = delete;
+ MessageDelayPool &operator=(const MessageDelayPool &) = delete;
+
+ /// Increases the aggregate bucket level with the aggregateRestore speed.
+ void refillBucket();
+ /// decreases the aggregate level
+ void bytesIn(int qty) { if (!noLimit()) theBucket.bytesIn(qty); }
+ /// current aggregate level
+ int level() { return theBucket.level(); }
+ /// creates an individual response bucket
+ MessageBucketPointer createBucket();
+ /// whether the aggregate bucket has no limit
+ bool noLimit () const { return aggregateRestore < 0; }
+
+ void dump (StoreEntry * entry) const;
+
+ acl_access *access;
+ /// the response delay pool name
+ SBuf poolName;
+ /// the speed limit of an individual bucket (bytes/s)
+ int64_t individualRestore;
+ /// the maximum size of an individual bucket
+ int64_t individualMaximum;
+ /// the speed limit of the aggregate bucket (bytes/s)
+ int64_t aggregateRestore;
+ /// the maximum size of the aggregate bucket
+ int64_t aggregateMaximum;
+ /// the initial bucket size as a percentage of individualMaximum
+ uint16_t initialBucketLevel;
+ /// the aggregate bucket
+ DelayBucket theBucket;
+
+private:
+ /// Time the aggregate bucket level was last refilled.
+ time_t lastUpdate;
+};
+
+/// \ingroup DelayPoolsAPI
+/// represents all configured 'response' delay pools
+class MessageDelayPools
+{
+public:
+ MessageDelayPools(const MessageDelayPools &) = delete;
+ MessageDelayPools &operator=(const MessageDelayPools &) = delete;
+
+ static MessageDelayPools *Instance();
+
+ /// returns a MessageDelayPool with a given name or null otherwise
+ MessageDelayPool::Pointer pool(const SBuf &name);
+ /// appends a single MessageDelayPool, created during configuration
+ void add(MessageDelayPool *pool);
+ /// memory cleanup, performing during reconfiguration
+ void freePools();
+
+ std::vector<MessageDelayPool::Pointer> pools;
+
+private:
+ MessageDelayPools() {}
+ ~MessageDelayPools();
+ void Stats() { } // TODO
+};
+
+/// represents configuration for response delay pools
+class MessageDelayConfig
+{
+public:
+ void parseResponseDelayPool();
+ void dumpResponseDelayPoolParameters(StoreEntry *e, const char *name);
+ void parseResponseDelayPoolAccess();
+ void freePools();
+};
+
+#define free_response_delay_pool_access(X)
+#define dump_response_delay_pool_access(X, Y, Z)
+
+inline void
+free_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+ cfg->freePools();
+}
+
+inline void
+dump_response_delay_pool_parameters(StoreEntry *entry, const char *name, MessageDelayConfig &cfg)
+{
+ cfg.dumpResponseDelayPoolParameters(entry, name);
+}
+
+inline void
+parse_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+ cfg->parseResponseDelayPool();
+}
+
+inline void
+parse_response_delay_pool_access(MessageDelayConfig * cfg)
+{
+ cfg->parseResponseDelayPoolAccess();
+}
+
+#endif
+#endif
+
=== modified file 'src/SquidConfig.h'
--- src/SquidConfig.h 2017-01-01 00:12:22 +0000
+++ src/SquidConfig.h 2017-01-30 13:45:55 +0000
@@ -1,49 +1,54 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_SQUIDCONFIG_H_
#define SQUID_SQUIDCONFIG_H_
#include "acl/forward.h"
#include "base/RefCount.h"
#include "base/YesNoNone.h"
+#if USE_DELAY_POOLS
#include "ClientDelayConfig.h"
#include "DelayConfig.h"
+#endif
#include "helper/ChildConfig.h"
#include "HttpHeaderTools.h"
#include "ip/Address.h"
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+#endif
#include "Notes.h"
#include "security/forward.h"
#include "SquidTime.h"
#if USE_OPENSSL
#include "ssl/support.h"
#endif
#include "store/forward.h"
#if USE_OPENSSL
class sslproxy_cert_sign;
class sslproxy_cert_adapt;
#endif
namespace Mgr
{
class ActionPasswordList;
} // namespace Mgr
class CachePeer;
class CustomLog;
class CpuAffinityMap;
class external_acl;
class HeaderManglers;
class RefreshPattern;
class RemovalPolicySettings;
namespace AnyP
{
class PortCfg;
}
@@ -407,60 +412,61 @@
int eprt;
int sanitycheck;
int telnet;
} Ftp;
RefreshPattern *Refresh;
Store::DiskConfig cacheSwap;
struct {
char *directory;
int use_short_names;
} icons;
char *errorDirectory;
#if USE_ERR_LOCALES
char *errorDefaultLanguage;
int errorLogMissingLanguages;
#endif
char *errorStylesheet;
struct {
int onerror;
} retry;
struct {
int64_t limit;
} MemPools;
#if USE_DELAY_POOLS
DelayConfig Delay;
ClientDelayConfig ClientDelay;
+ MessageDelayConfig MessageDelay;
#endif
struct {
struct {
int average;
int min_poll;
} dns, udp, tcp;
} comm_incoming;
int max_open_disk_fds;
int uri_whitespace;
AclSizeLimit *rangeOffsetLimit;
#if MULTICAST_MISS_STREAM
struct {
Ip::Address addr;
int ttl;
unsigned short port;
char *encode_key;
} mcast_miss;
#endif
/// request_header_access and request_header_replace
HeaderManglers *request_header_access;
/// reply_header_access and reply_header_replace
HeaderManglers *reply_header_access;
///request_header_add access list
HeaderWithAclList *request_header_add;
///reply_header_add access list
HeaderWithAclList *reply_header_add;
=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc 2017-01-01 00:12:22 +0000
+++ src/cache_cf.cc 2017-01-30 13:46:33 +0000
@@ -15,60 +15,61 @@
#include "acl/Address.h"
#include "acl/Gadgets.h"
#include "acl/MethodData.h"
#include "acl/Tree.h"
#include "anyp/PortCfg.h"
#include "anyp/UriScheme.h"
#include "auth/Config.h"
#include "auth/Scheme.h"
#include "AuthReg.h"
#include "base/RunnersRegistry.h"
#include "cache_cf.h"
#include "CachePeer.h"
#include "ConfigParser.h"
#include "CpuAffinityMap.h"
#include "DiskIO/DiskIOModule.h"
#include "eui/Config.h"
#include "ExternalACL.h"
#include "format/Format.h"
#include "ftp/Elements.h"
#include "globals.h"
#include "HttpHeaderTools.h"
#include "icmp/IcmpConfig.h"
#include "ident/Config.h"
#include "ip/Intercept.h"
#include "ip/QosConfig.h"
#include "ip/tools.h"
#include "ipc/Kids.h"
#include "log/Config.h"
#include "log/CustomLog.h"
#include "MemBuf.h"
+#include "MessageDelayPools.h"
#include "mgr/ActionPasswordList.h"
#include "mgr/Registration.h"
#include "neighbors.h"
#include "NeighborTypeDomainList.h"
#include "Parsing.h"
#include "pconn.h"
#include "PeerDigest.h"
#include "PeerPoolMgr.h"
#include "redirect.h"
#include "RefreshPattern.h"
#include "rfc1738.h"
#include "sbuf/List.h"
#include "SquidConfig.h"
#include "SquidString.h"
#include "ssl/ProxyCerts.h"
#include "Store.h"
#include "store/Disk.h"
#include "store/Disks.h"
#include "StoreFileSystem.h"
#include "tools.h"
#include "util.h"
#include "wordlist.h"
/* wccp2 has its own conditional definitions */
#include "wccp2.h"
#if USE_ADAPTATION
#include "adaptation/Config.h"
#endif
#if ICAP_CLIENT
#include "adaptation/icap/Config.h"
#endif
@@ -1627,61 +1628,61 @@
}
static void
parse_delay_pool_rates(DelayConfig * cfg)
{
cfg->parsePoolRates();
}
static void
parse_delay_pool_access(DelayConfig * cfg)
{
cfg->parsePoolAccess(LegacyParser);
}
#endif
#if USE_DELAY_POOLS
#include "ClientDelayConfig.h"
/* do nothing - free_client_delay_pool_count is the magic free function.
* this is why client_delay_pool_count isn't just marked TYPE: u_short
*/
#define free_client_delay_pool_access(X)
#define free_client_delay_pool_rates(X)
#define dump_client_delay_pool_access(X, Y, Z)
#define dump_client_delay_pool_rates(X, Y, Z)
static void
free_client_delay_pool_count(ClientDelayConfig * cfg)
{
- cfg->freePoolCount();
+ cfg->freePools();
}
static void
dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
{
cfg.dumpPoolCount (entry, name);
}
static void
parse_client_delay_pool_count(ClientDelayConfig * cfg)
{
cfg->parsePoolCount();
}
static void
parse_client_delay_pool_rates(ClientDelayConfig * cfg)
{
cfg->parsePoolRates();
}
static void
parse_client_delay_pool_access(ClientDelayConfig * cfg)
{
cfg->parsePoolAccess(LegacyParser);
}
#endif
#if USE_HTTP_VIOLATIONS
static void
dump_http_header_access(StoreEntry * entry, const char *name, const HeaderManglers *manglers)
=== modified file 'src/cf.data.depend'
--- src/cf.data.depend 2017-01-01 00:12:22 +0000
+++ src/cf.data.depend 2017-01-22 08:42:37 +0000
@@ -6,60 +6,62 @@
##
#
# type dependencies
#
access_log acl logformat
acl external_acl_type auth_param
acl_access acl
acl_address acl
acl_b_size_t acl
acl_tos acl
acl_nfmark acl
address
authparam
AuthSchemes acl auth_param
b_int64_t
b_size_t
b_ssize_t
cachedir cache_replacement_policy
cachemgrpasswd
ConfigAclTos
configuration_includes_quoted_values
CpuAffinityMap
debug
delay_pool_access acl delay_class
delay_pool_class delay_pools
delay_pool_count
delay_pool_rates delay_class
client_delay_pool_access acl
client_delay_pool_count
client_delay_pool_rates
+response_delay_pool_access acl
+response_delay_pool_parameters
denyinfo acl
eol
externalAclHelper auth_param
HelperChildConfig
hostdomain cache_peer
hostdomaintype cache_peer
http_header_access acl
http_header_replace
HeaderWithAclList acl
adaptation_access_type adaptation_service_set adaptation_service_chain acl icap_service icap_class
adaptation_service_set_type icap_service ecap_service
adaptation_service_chain_type icap_service ecap_service
icap_access_type icap_class acl
icap_class_type icap_service
icap_service_type
icap_service_failure_limit
icmp
ecap_service_type
int
int64_t
kb_int64_t
kb_size_t
logformat
YesNoNone
memcachemode
note acl
obsolete
onoff
on_unsupported_protocol acl
peer
=== modified file 'src/cf.data.pre'
--- src/cf.data.pre 2017-01-30 18:10:15 +0000
+++ src/cf.data.pre 2017-01-30 22:18:05 +0000
@@ -7158,60 +7158,118 @@
request:
client_delay_access pool_ID allow|deny acl_name
All client_delay_access options are checked in their pool ID
order, starting with pool 1. The first checked pool with allowed
request is selected for the request. If no ACL matches or there
are no client_delay_access options, the request bandwidth is not
limited.
The ACL-selected pool is then used to find the
client_delay_parameters for the request. Client-side pools are
not used to aggregate clients. Clients are always aggregated
based on their source IP addresses (one bucket per source IP).
This clause only supports fast acl types.
See http://wiki.squid-cache.org/SquidFaq/SquidAcl for details.
Additionally, only the client TCP connection details are available.
ACLs testing HTTP properties will not work.
Please see delay_access for more examples.
Example:
client_delay_access 1 allow low_rate_network
client_delay_access 2 allow vips_network
See also client_delay_parameters and client_delay_pools.
DOC_END
+NAME: response_delay_pool
+TYPE: response_delay_pool_parameters
+DEFAULT: none
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+ This option configures client response bandwidth limits using the
+ following format:
+
+ response_delay_pool name [option=value] ...
+
+ name the response delay pool name
+
+ available options:
+
+ individual-restore The speed limit of an individual
+ bucket(bytes/s). To be used in conjunction
+ with 'individual-maximum'.
+
+ individual-maximum The maximum number of bytes which can
+ be placed into the individual bucket. To be used
+ in conjunction with 'individual-restore'.
+
+ aggregate-restore The speed limit for the aggregate
+ bucket(bytes/s). To be used in conjunction with
+ 'aggregate-maximum'.
+
+ aggregate-maximum The maximum number of bytes which can
+ be placed into the aggregate bucket. To be used
+ in conjunction with 'aggregate-restore'.
+
+ initial-bucket-level The initial bucket size as a percentage
+ of individual-maximum.
+
+ Individual and(or) aggregate bucket options may not be specified,
+ meaning no individual and(or) aggregate speed limitation.
+ See also response_delay_pool_access and delay_parameters for
+ terminology details.
+DOC_END
+
+NAME: response_delay_pool_access
+TYPE: response_delay_pool_access
+DEFAULT: none
+DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conf for the pool.
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+ Determines whether a specific named response delay pool is used
+ for the transaction. The syntax for this directive is:
+
+ response_delay_pool_access pool_name allow|deny acl_name
+
+ All response_delay_pool_access options are checked in the order
+ they appear in this configuration file. The first rule with a
+ matching ACL wins. If (and only if) an "allow" rule won, Squid
+ assigns the response to the corresponding named delay pool.
+DOC_END
+
COMMENT_START
WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
-----------------------------------------------------------------------------
COMMENT_END
NAME: wccp_router
TYPE: address
LOC: Config.Wccp.router
DEFAULT: any_addr
DEFAULT_DOC: WCCP disabled.
IFDEF: USE_WCCP
DOC_START
Use this option to define your WCCP ``home'' router for
Squid.
wccp_router supports a single WCCP(v1) router
wccp2_router supports multiple WCCPv2 routers
only one of the two may be used at the same time and defines
which version of WCCP to use.
DOC_END
NAME: wccp2_router
TYPE: IpAddress_list
LOC: Config.Wccp2.router
DEFAULT: none
DEFAULT_DOC: WCCPv2 disabled.
IFDEF: USE_WCCPv2
DOC_START
=== modified file 'src/client_db.cc'
--- src/client_db.cc 2017-01-01 00:12:22 +0000
+++ src/client_db.cc 2017-01-30 13:46:34 +0000
@@ -26,94 +26,85 @@
#include "tools.h"
#if SQUID_SNMP
#include "snmp_core.h"
#endif
static hash_table *client_table = NULL;
static ClientInfo *clientdbAdd(const Ip::Address &addr);
static FREE clientdbFreeItem;
static void clientdbStartGC(void);
static void clientdbScheduledGC(void *);
#if USE_DELAY_POOLS
static int max_clients = 32768;
#else
static int max_clients = 32;
#endif
static int cleanup_running = 0;
static int cleanup_scheduled = 0;
static int cleanup_removed;
#if USE_DELAY_POOLS
#define CLIENT_DB_HASH_SIZE 65357
#else
#define CLIENT_DB_HASH_SIZE 467
#endif
ClientInfo::ClientInfo(const Ip::Address &ip) :
+#if USE_DELAY_POOLS
+ BandwidthBucket(0, 0, 0),
+#endif
addr(ip),
n_established(0),
last_seen(0)
#if USE_DELAY_POOLS
- , writeSpeedLimit(0),
- prevTime(0),
- bucketSize(0),
- bucketSizeLimit(0),
- writeLimitingActive(false),
+ , writeLimitingActive(false),
firstTimeConnection(true),
quotaQueue(nullptr),
rationedQuota(0),
rationedCount(0),
- selectWaiting(false),
eventWaiting(false)
#endif
{
debugs(77, 9, "ClientInfo constructed, this=" << static_cast<void*>(this));
-
-#if USE_DELAY_POOLS
- getCurrentTime();
- /* put current time to have something sensible here */
- prevTime = current_dtime;
-#endif
-
char *buf = static_cast<char*>(xmalloc(MAX_IPSTRLEN)); // becomes hash.key
- hash.key = addr.toStr(buf,MAX_IPSTRLEN);
+ key = addr.toStr(buf,MAX_IPSTRLEN);
}
static ClientInfo *
clientdbAdd(const Ip::Address &addr)
{
ClientInfo *c = new ClientInfo(addr);
- hash_join(client_table, &c->hash);
+ hash_join(client_table, static_cast<hash_link*>(c));
++statCounter.client_http.clients;
if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
++cleanup_scheduled;
eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 90, 0);
}
return c;
}
static void
clientdbInit(void)
{
if (client_table)
return;
client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
}
class ClientDbRr: public RegisteredRunner
{
public:
/* RegisteredRunner API */
virtual void useConfig();
};
RunnerRegistrationEntry(ClientDbRr);
void
ClientDbRr::useConfig()
{
@@ -250,245 +241,237 @@
NR = 150;
ND = c->Icp.result_hist[LOG_UDP_DENIED] - c->cutoff.n_denied;
p = 100.0 * ND / NR;
if (p < 95.0)
return 0;
debugs(1, DBG_CRITICAL, "WARNING: Probable misconfigured neighbor at " << key);
debugs(1, DBG_CRITICAL, "WARNING: " << ND << " of the last " << NR <<
" ICP replies are DENIED");
debugs(1, DBG_CRITICAL, "WARNING: No replies will be sent for the next " <<
CUTOFF_SECONDS << " seconds");
c->cutoff.time = squid_curtime;
c->cutoff.n_req = c->Icp.n_requests;
c->cutoff.n_denied = c->Icp.result_hist[LOG_UDP_DENIED];
return 1;
}
void
clientdbDump(StoreEntry * sentry)
{
const char *name;
- ClientInfo *c;
int icp_total = 0;
int icp_hits = 0;
int http_total = 0;
int http_hits = 0;
storeAppendPrintf(sentry, "Cache Clients:\n");
hash_first(client_table);
- while ((c = (ClientInfo *) hash_next(client_table))) {
- storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash));
+ while (hash_link *hash = hash_next(client_table)) {
+ const ClientInfo *c = reinterpret_cast<const ClientInfo *>(hash);
+ storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash));
if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) {
storeAppendPrintf(sentry, "Name: %s\n", name);
}
storeAppendPrintf(sentry, "Currently established connections: %d\n",
c->n_established);
storeAppendPrintf(sentry, " ICP Requests %d\n",
c->Icp.n_requests);
for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
if (c->Icp.result_hist[l] == 0)
continue;
icp_total += c->Icp.result_hist[l];
if (LOG_UDP_HIT == l)
icp_hits += c->Icp.result_hist[l];
storeAppendPrintf(sentry, " %-20.20s %7d %3d%%\n", LogTags(l).c_str(), c->Icp.result_hist[l], Math::intPercent(c->Icp.result_hist[l], c->Icp.n_requests));
}
storeAppendPrintf(sentry, " HTTP Requests %d\n", c->Http.n_requests);
for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
if (c->Http.result_hist[l] == 0)
continue;
http_total += c->Http.result_hist[l];
if (LogTags(l).isTcpHit())
http_hits += c->Http.result_hist[l];
storeAppendPrintf(sentry,
" %-20.20s %7d %3d%%\n",
LogTags(l).c_str(),
c->Http.result_hist[l],
Math::intPercent(c->Http.result_hist[l], c->Http.n_requests));
}
storeAppendPrintf(sentry, "\n");
}
storeAppendPrintf(sentry, "TOTALS\n");
storeAppendPrintf(sentry, "ICP : %d Queries, %d Hits (%3d%%)\n",
icp_total, icp_hits, Math::intPercent(icp_hits, icp_total));
storeAppendPrintf(sentry, "HTTP: %d Requests, %d Hits (%3d%%)\n",
http_total, http_hits, Math::intPercent(http_hits, http_total));
}
static void
clientdbFreeItem(void *data)
{
ClientInfo *c = (ClientInfo *)data;
delete c;
}
ClientInfo::~ClientInfo()
{
- safe_free(hash.key);
+ safe_free(key);
#if USE_DELAY_POOLS
if (CommQuotaQueue *q = quotaQueue) {
q->clientInfo = NULL;
delete q; // invalidates cbdata, cancelling any pending kicks
}
#endif
debugs(77, 9, "ClientInfo destructed, this=" << static_cast<void*>(this));
}
void
clientdbFreeMemory(void)
{
hashFreeItems(client_table, clientdbFreeItem);
hashFreeMemory(client_table);
client_table = NULL;
}
static void
clientdbScheduledGC(void *)
{
cleanup_scheduled = 0;
clientdbStartGC();
}
static void
clientdbGC(void *)
{
static int bucket = 0;
hash_link *link_next;
link_next = hash_get_bucket(client_table, bucket++);
while (link_next != NULL) {
ClientInfo *c = (ClientInfo *)link_next;
int age = squid_curtime - c->last_seen;
link_next = link_next->next;
if (c->n_established)
continue;
if (age < 24 * 3600 && c->Http.n_requests > 100)
continue;
if (age < 4 * 3600 && (c->Http.n_requests > 10 || c->Icp.n_requests > 10))
continue;
if (age < 5 * 60 && (c->Http.n_requests > 1 || c->Icp.n_requests > 1))
continue;
if (age < 60)
continue;
- hash_remove_link(client_table, &c->hash);
+ hash_remove_link(client_table, static_cast<hash_link*>(c));
clientdbFreeItem(c);
--statCounter.client_http.clients;
++cleanup_removed;
}
if (bucket < CLIENT_DB_HASH_SIZE)
eventAdd("client_db garbage collector", clientdbGC, NULL, 0.15, 0);
else {
bucket = 0;
cleanup_running = 0;
max_clients = statCounter.client_http.clients * 3 / 2;
if (!cleanup_scheduled) {
cleanup_scheduled = 1;
eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 6 * 3600, 0);
}
debugs(49, 2, "clientdbGC: Removed " << cleanup_removed << " entries");
}
}
static void
clientdbStartGC(void)
{
max_clients = statCounter.client_http.clients;
cleanup_running = 1;
cleanup_removed = 0;
clientdbGC(NULL);
}
#if SQUID_SNMP
Ip::Address *
client_entry(Ip::Address *current)
{
- ClientInfo *c = NULL;
char key[MAX_IPSTRLEN];
+ hash_first(client_table);
if (current) {
current->toStr(key,MAX_IPSTRLEN);
- hash_first(client_table);
- while ((c = (ClientInfo *) hash_next(client_table))) {
- if (!strcmp(key, hashKeyStr(&c->hash)))
+ while (hash_link *hash = hash_next(client_table)) {
+ if (!strcmp(key, hashKeyStr(hash)))
break;
}
-
- c = (ClientInfo *) hash_next(client_table);
- } else {
- hash_first(client_table);
- c = (ClientInfo *) hash_next(client_table);
}
+ ClientInfo *c = reinterpret_cast<ClientInfo *>(hash_next(client_table));
+
hash_last(client_table);
- if (c)
- return (&c->addr);
- else
- return (NULL);
-
+ return c ? &c->addr : nullptr;
}
variable_list *
snmp_meshCtblFn(variable_list * Var, snint * ErrP)
{
char key[MAX_IPSTRLEN];
ClientInfo *c = NULL;
Ip::Address keyIp;
*ErrP = SNMP_ERR_NOERROR;
MemBuf tmp;
debugs(49, 6, HERE << "Current : length=" << Var->name_length << ": " << snmpDebugOid(Var->name, Var->name_length, tmp));
if (Var->name_length == 16) {
oid2addr(&(Var->name[12]), keyIp, 4);
} else if (Var->name_length == 28) {
oid2addr(&(Var->name[12]), keyIp, 16);
} else {
*ErrP = SNMP_ERR_NOSUCHNAME;
return NULL;
}
keyIp.toStr(key, sizeof(key));
debugs(49, 5, HERE << "[" << key << "] requested!");
c = (ClientInfo *) hash_lookup(client_table, key);
if (c == NULL) {
debugs(49, 5, HERE << "not found.");
*ErrP = SNMP_ERR_NOSUCHNAME;
return NULL;
}
=== modified file 'src/client_side.cc'
--- src/client_side.cc 2017-01-30 12:46:15 +0000
+++ src/client_side.cc 2017-01-30 22:18:05 +0000
@@ -93,60 +93,61 @@
#include "HttpReply.h"
#include "HttpRequest.h"
#include "ident/Config.h"
#include "ident/Ident.h"
#include "internal.h"
#include "ipc/FdNotes.h"
#include "ipc/StartListening.h"
#include "log/access_log.h"
#include "MemBuf.h"
#include "MemObject.h"
#include "mime_header.h"
#include "parser/Tokenizer.h"
#include "profiler/Profiler.h"
#include "rfc1738.h"
#include "security/NegotiationHistory.h"
#include "servers/forward.h"
#include "SquidConfig.h"
#include "SquidTime.h"
#include "StatCounters.h"
#include "StatHist.h"
#include "Store.h"
#include "TimeOrTag.h"
#include "tools.h"
#include "URL.h"
#if USE_AUTH
#include "auth/UserRequest.h"
#endif
#if USE_DELAY_POOLS
#include "ClientInfo.h"
+#include "MessageDelayPools.h"
#endif
#if USE_OPENSSL
#include "ssl/bio.h"
#include "ssl/context_storage.h"
#include "ssl/gadgets.h"
#include "ssl/helper.h"
#include "ssl/ProxyCerts.h"
#include "ssl/ServerBump.h"
#include "ssl/support.h"
#endif
// for tvSubUsec() which should be in SquidTime.h
#include "util.h"
#include <climits>
#include <cmath>
#include <limits>
#if LINGERING_CLOSE
#define comm_close comm_lingering_close
#endif
/// dials clientListenerConnectionOpened call
class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
{
public:
typedef void (*Handler)(AnyP::PortCfgPointer &portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
ListeningStartedDialer(Handler aHandler, AnyP::PortCfgPointer &aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
handler(aHandler), portCfg(aPortCfg), portTypeNote(note), sub(aSub) {}
@@ -2461,89 +2462,89 @@
AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, ConnStateData::connStateClosed);
comm_add_close_handler(clientConnection->fd, call);
if (Config.onoff.log_fqdn)
fqdncache_gethostbyaddr(clientConnection->remote, FQDN_LOOKUP_IF_MISS);
#if USE_IDENT
if (Ident::TheConfig.identLookup) {
ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL);
identChecklist.src_addr = clientConnection->remote;
identChecklist.my_addr = clientConnection->local;
if (identChecklist.fastCheck() == ACCESS_ALLOWED)
Ident::Start(clientConnection, clientIdentDone, this);
}
#endif
clientdbEstablished(clientConnection->remote, 1);
needProxyProtocolHeader_ = port->flags.proxySurrogate;
if (needProxyProtocolHeader_) {
if (!proxyProtocolValidateClient()) // will close the connection on failure
return;
}
#if USE_DELAY_POOLS
fd_table[clientConnection->fd].clientInfo = NULL;
if (Config.onoff.client_db) {
/* it was said several times that client write limiter does not work if client_db is disabled */
- ClientDelayPools& pools(Config.ClientDelay.pools);
+ auto &pools = ClientDelayPools::Instance()->pools;
ACLFilledChecklist ch(NULL, NULL, NULL);
// TODO: we check early to limit error response bandwith but we
// should recheck when we can honor delay_pool_uses_indirect
// TODO: we should also pass the port details for myportname here.
ch.src_addr = clientConnection->remote;
ch.my_addr = clientConnection->local;
for (unsigned int pool = 0; pool < pools.size(); ++pool) {
/* pools require explicit 'allow' to assign a client into them */
- if (pools[pool].access) {
- ch.changeAcl(pools[pool].access);
+ if (pools[pool]->access) {
+ ch.changeAcl(pools[pool]->access);
allow_t answer = ch.fastCheck();
if (answer == ACCESS_ALLOWED) {
/* request client information from db after we did all checks
this will save hash lookup if client failed checks */
ClientInfo * cli = clientdbGetInfo(clientConnection->remote);
assert(cli);
/* put client info in FDE */
fd_table[clientConnection->fd].clientInfo = cli;
/* setup write limiter for this request */
const double burst = floor(0.5 +
- (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
- cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+ (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0);
+ cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark);
break;
} else {
debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer);
}
}
}
}
#endif
// kids must extend to actually start doing something (e.g., reading)
}
/** Handle a new connection on an HTTP socket. */
void
httpAccept(const CommAcceptCbParams ¶ms)
{
MasterXaction::Pointer xact = params.xaction;
AnyP::PortCfgPointer s = xact->squidPort;
// NP: it is possible the port was reconfigured when the call or accept() was queued.
if (params.flag != Comm::OK) {
// Its possible the call was still queued when the client disconnected
debugs(33, 2, s->listenConn << ": accept failure: " << xstrerr(params.xerrno));
return;
}
debugs(33, 4, params.conn << ": accepted");
fd_note(params.conn->fd, "client http connect");
=== modified file 'src/client_side.h'
--- src/client_side.h 2017-01-30 12:46:15 +0000
+++ src/client_side.h 2017-01-30 22:18:05 +0000
@@ -1,59 +1,62 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
/* DEBUG: section 33 Client-side Routines */
#ifndef SQUID_CLIENTSIDE_H
#define SQUID_CLIENTSIDE_H
#include "base/RunnersRegistry.h"
#include "clientStreamForward.h"
#include "comm.h"
#include "helper/forward.h"
#include "http/forward.h"
#include "HttpControlMsg.h"
#include "ipc/FdNotes.h"
#include "sbuf/SBuf.h"
#include "servers/Server.h"
#if USE_AUTH
#include "auth/UserRequest.h"
#endif
#if USE_OPENSSL
#include "security/Handshake.h"
#include "ssl/support.h"
#endif
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
class ClientHttpRequest;
class HttpHdrRangeSpec;
#if USE_OPENSSL
namespace Ssl
{
class ServerBump;
}
#endif
/**
* Legacy Server code managing a connection to a client.
*
* NP: presents AsyncJob API but does not operate autonomously as a Job.
* So Must() is not safe to use.
*
* Multiple requests (up to pipeline_prefetch) can be pipelined.
* This object is responsible for managing which one is currently being
* fulfilled and what happens to the queue if the current one causes the client
* connection to be closed early.
*
* Act as a manager for the client connection and passes data in buffer to a
* Parser relevant to the state (message headers vs body) that is being
* processed.
*
* Performs HTTP message processing to kick off the actual HTTP request
* handling objects (Http::Stream, ClientHttpRequest, HttpRequest).
*
* Performs SSL-Bump processing for switching between HTTP and HTTPS protocols.
=== modified file 'src/comm.cc'
--- src/comm.cc 2017-01-01 00:12:22 +0000
+++ src/comm.cc 2017-01-30 13:46:37 +0000
@@ -889,67 +889,63 @@
PROF_start(comm_close);
F->flags.close_request = true;
#if USE_OPENSSL
if (F->ssl) {
AsyncCall::Pointer startCall=commCbCall(5,4, "commStartSslClose",
FdeCbPtrFun(commStartSslClose, NULL));
FdeCbParams &startParams = GetCommParams<FdeCbParams>(startCall);
startParams.fd = fd;
ScheduleCallHere(startCall);
}
#endif
// a half-closed fd may lack a reader, so we stop monitoring explicitly
if (commHasHalfClosedMonitor(fd))
commStopHalfClosedMonitor(fd);
commUnsetFdTimeout(fd);
// notify read/write handlers after canceling select reservations, if any
if (COMMIO_FD_WRITECB(fd)->active()) {
Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
COMMIO_FD_WRITECB(fd)->finish(Comm::ERR_CLOSING, errno);
}
if (COMMIO_FD_READCB(fd)->active()) {
Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
COMMIO_FD_READCB(fd)->finish(Comm::ERR_CLOSING, errno);
}
#if USE_DELAY_POOLS
- if (ClientInfo *clientInfo = F->clientInfo) {
- if (clientInfo->selectWaiting) {
- clientInfo->selectWaiting = false;
- // kick queue or it will get stuck as commWriteHandle is not called
- clientInfo->kickQuotaQueue();
- }
- }
+ if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F))
+ if (bucket->selectWaiting)
+ bucket->onFdClosed();
#endif
commCallCloseHandlers(fd);
comm_empty_os_read_buffers(fd);
AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
FdeCbPtrFun(comm_close_complete, NULL));
FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall);
completeParams.fd = fd;
// must use async call to wait for all callbacks
// scheduled before comm_close() to finish
ScheduleCallHere(completeCall);
PROF_stop(comm_close);
}
/* Send a udp datagram to specified TO_ADDR. */
int
comm_udp_sendto(int fd,
const Ip::Address &to_addr,
const void *buf,
int len)
{
PROF_start(comm_udp_sendto);
++ statCounter.syscalls.sock.sendtos;
debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
" using FD " << fd << " using Port " << comm_local_port(fd) );
@@ -1324,195 +1320,197 @@
/// returns the reservation ID of the first descriptor to be dequeued
unsigned int
ClientInfo::quotaPeekReserv() const
{
assert(quotaQueue);
return quotaQueue->outs + 1;
}
/// queues a given fd, creating the queue if necessary; returns reservation ID
unsigned int
ClientInfo::quotaEnqueue(int fd)
{
assert(quotaQueue);
return quotaQueue->enqueue(fd);
}
/// removes queue head
void
ClientInfo::quotaDequeue()
{
assert(quotaQueue);
quotaQueue->dequeue();
}
void
ClientInfo::kickQuotaQueue()
{
if (!eventWaiting && !selectWaiting && hasQueue()) {
// wait at least a second if the bucket is empty
- const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+ const double delay = (bucketLevel < 1.0) ? 1.0 : 0.0;
eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
quotaQueue, delay, 0, true);
eventWaiting = true;
}
}
/// calculates how much to write for a single dequeued client
int
-ClientInfo::quotaForDequed()
+ClientInfo::quota()
{
/* If we have multiple clients and give full bucketSize to each client then
* clt1 may often get a lot more because clt1->clt2 time distance in the
* select(2) callback order may be a lot smaller than cltN->clt1 distance.
* We divide quota evenly to be more fair. */
if (!rationedCount) {
rationedCount = quotaQueue->size() + 1;
// The delay in ration recalculation _temporary_ deprives clients from
// bytes that should have trickled in while rationedCount was positive.
refillBucket();
// Rounding errors do not accumulate here, but we round down to avoid
// negative bucket sizes after write with rationedCount=1.
- rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+ rationedQuota = static_cast<int>(floor(bucketLevel/rationedCount));
debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
'*' << rationedCount);
}
--rationedCount;
debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
" rations remaining: " << rationedCount);
// update 'last seen' time to prevent clientdb GC from dropping us
last_seen = squid_curtime;
return rationedQuota;
}
-///< adds bytes to the quota bucket based on the rate and passed time
-void
-ClientInfo::refillBucket()
-{
- // all these times are in seconds, with double precision
- const double currTime = current_dtime;
- const double timePassed = currTime - prevTime;
-
- // Calculate allowance for the time passed. Use double to avoid
- // accumulating rounding errors for small intervals. For example, always
- // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
- const double gain = timePassed * writeSpeedLimit;
-
- debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
- bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
- " = " << gain << ')');
-
- // to further combat error accumulation during micro updates,
- // quit before updating time if we cannot add at least one byte
- if (gain < 1.0)
- return;
-
- prevTime = currTime;
-
- // for "first" connections, drain initial fat before refilling but keep
- // updating prevTime to avoid bursts after the fat is gone
- if (bucketSize > bucketSizeLimit) {
- debugs(77,4, HERE << "not refilling while draining initial fat");
- return;
- }
-
- bucketSize += gain;
-
- // obey quota limits
- if (bucketSize > bucketSizeLimit)
- bucketSize = bucketSizeLimit;
+bool
+ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+ assert(hasQueue());
+ assert(quotaPeekFd() == state->conn->fd);
+ quotaDequeue(); // we will write or requeue below
+ if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) {
+ state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+ kickQuotaQueue();
+ return false;
+ }
+ return true;
+}
+
+void
+ClientInfo::scheduleWrite(Comm::IoCallback *state)
+{
+ if (writeLimitingActive) {
+ state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+ kickQuotaQueue();
+ }
+}
+
+void
+ClientInfo::onFdClosed()
+{
+ BandwidthBucket::onFdClosed();
+ // kick queue or it will get stuck as commWriteHandle is not called
+ kickQuotaQueue();
+}
+
+void
+ClientInfo::reduceBucket(const int len)
+{
+ if (len > 0)
+ BandwidthBucket::reduceBucket(len);
+ // even if we wrote nothing, we were served; give others a chance
+ kickQuotaQueue();
}
void
ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
{
- debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+ debugs(77,5, "Write limits for " << (const char*)key <<
" speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
" highwatermark=" << aHighWatermark);
// set or possibly update traffic shaping parameters
writeLimitingActive = true;
writeSpeedLimit = aWriteSpeedLimit;
bucketSizeLimit = aHighWatermark;
// but some members should only be set once for a newly activated bucket
if (firstTimeConnection) {
firstTimeConnection = false;
assert(!selectWaiting);
assert(!quotaQueue);
quotaQueue = new CommQuotaQueue(this);
- bucketSize = anInitialBurst;
+ bucketLevel = anInitialBurst;
prevTime = current_dtime;
}
}
CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
ins(0), outs(0)
{
assert(clientInfo);
}
CommQuotaQueue::~CommQuotaQueue()
{
assert(!clientInfo); // ClientInfo should clear this before destroying us
}
/// places the given fd at the end of the queue; returns reservation ID
unsigned int
CommQuotaQueue::enqueue(int fd)
{
- debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ debugs(77,5, "clt" << (const char*)clientInfo->key <<
": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
fds.push_back(fd);
return ++ins;
}
/// removes queue head
void
CommQuotaQueue::dequeue()
{
assert(!fds.empty());
- debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ debugs(77,5, "clt" << (const char*)clientInfo->key <<
": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
fds.size());
fds.pop_front();
++outs;
}
-#endif
+#endif /* USE_DELAY_POOLS */
/*
* hm, this might be too general-purpose for all the places we'd
* like to use it.
*/
int
ignoreErrno(int ierrno)
{
switch (ierrno) {
case EINPROGRESS:
case EWOULDBLOCK:
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EALREADY:
case EINTR:
#ifdef ERESTART
case ERESTART:
#endif
return 1;
default:
return 0;
@@ -1566,61 +1564,70 @@
return false;
}
static bool
writeTimedOut(int fd)
{
if (!COMMIO_FD_WRITECB(fd)->active())
return false;
if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
return false;
return true;
}
void
checkTimeouts(void)
{
int fd;
fde *F = NULL;
AsyncCall::Pointer callback;
for (fd = 0; fd <= Biggest_FD; ++fd) {
F = &fd_table[fd];
if (writeTimedOut(fd)) {
// We have an active write callback and we are timed out
debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT);
- } else if (AlreadyTimedOut(F))
+#if USE_DELAY_POOLS
+ } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) {
+ if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) {
+ F->writeQuotaHandler->selectWaiting = true;
+ Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0);
+ }
+ continue;
+#endif
+ }
+ else if (AlreadyTimedOut(F))
continue;
debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
if (F->timeoutHandler != NULL) {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
callback = F->timeoutHandler;
F->timeoutHandler = NULL;
ScheduleCallHere(callback);
} else {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
comm_close(fd);
}
}
}
/// Start waiting for a possibly half-closed connection to close
// by scheduling a read callback to a monitoring handler that
// will close the connection on read errors.
void
commStartHalfClosedMonitor(int fd)
{
debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed);
assert(isOpen(fd) && !commHasHalfClosedMonitor(fd));
(void)TheHalfClosed->add(fd); // could also assert the result
commPlanHalfClosedCheck(); // may schedule check if we added the first FD
}
static
void
=== modified file 'src/comm/IoCallback.cc'
--- src/comm/IoCallback.cc 2017-01-01 00:12:22 +0000
+++ src/comm/IoCallback.cc 2017-01-30 13:46:36 +0000
@@ -42,67 +42,63 @@
}
/**
* Configure Comm::Callback for I/O
*
* @param fd filedescriptor
* @param t IO callback type (read or write)
* @param cb callback
* @param buf buffer, if applicable
* @param func freefunc, if applicable
* @param sz buffer size
*/
void
Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
{
assert(!active());
assert(type == t);
assert(cb != NULL);
callback = cb;
buf = b;
freefunc = f;
size = sz;
offset = 0;
}
void
Comm::IoCallback::selectOrQueueWrite()
{
#if USE_DELAY_POOLS
- // stand in line if there is one
- if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
- if (clientInfo->writeLimitingActive) {
- quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
- clientInfo->kickQuotaQueue();
- return;
- }
+ if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) {
+ bucket->scheduleWrite(this);
+ return;
}
#endif
SetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
}
void
Comm::IoCallback::cancel(const char *reason)
{
if (!active())
return;
callback->cancel(reason);
callback = NULL;
reset();
}
void
Comm::IoCallback::reset()
{
conn = NULL;
if (freefunc) {
freefunc(buf);
buf = NULL;
freefunc = NULL;
}
xerrno = 0;
#if USE_DELAY_POOLS
quotaQueueReserv = 0;
=== modified file 'src/comm/Loops.h'
--- src/comm/Loops.h 2017-01-01 00:12:22 +0000
+++ src/comm/Loops.h 2017-01-30 13:46:36 +0000
@@ -1,59 +1,56 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef _SQUID_SRC_COMM_LOOPS_H
#define _SQUID_SRC_COMM_LOOPS_H
#include "comm/Flag.h"
#include "comm/forward.h"
/* Comm layer select loops API.
*
* These API functions must be implemented by all FD IO loops used by Squid.
* Defines are provided short-term for legacy code. These will disappear soon.
*/
namespace Comm
{
/// Initialize the module on Squid startup
void SelectLoopInit(void);
-/// Mark an FD to be watched for its IO status.
-void SetSelect(int, unsigned int, PF *, void *, time_t);
-
/// reset/undo/unregister the watch for an FD which was set by Comm::SetSelect()
void ResetSelect(int);
/** Perform a select() or equivalent call.
* This is used by the main select loop engine to check for FD with IO available.
*/
Comm::Flag DoSelect(int);
void QuickPollRequired(void);
/**
* Max number of UDP messages to receive per call to the UDP receive poller.
* This is a per-port limit for ICP/HTCP ports.
* DNS has a separate limit.
*/
#if _SQUID_WINDOWS_
#define INCOMING_UDP_MAX 1
#else
#define INCOMING_UDP_MAX 15
#endif
/**
* Max number of DNS messages to receive per call to DNS read handler
*/
#if _SQUID_WINDOWS_
#define INCOMING_DNS_MAX 1
#else
#define INCOMING_DNS_MAX 15
#endif
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc 2017-01-01 00:12:22 +0000
+++ src/comm/Write.cc 2017-01-30 13:46:37 +0000
@@ -1,152 +1,124 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#include "squid.h"
+#include "cbdata.h"
#include "comm/Connection.h"
#include "comm/IoCallback.h"
+#include "comm/Loops.h"
#include "comm/Write.h"
#include "fd.h"
#include "fde.h"
#include "globals.h"
#include "MemBuf.h"
#include "profiler/Profiler.h"
#include "SquidTime.h"
#include "StatCounters.h"
#if USE_DELAY_POOLS
#include "ClientInfo.h"
#endif
#include <cerrno>
void
Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
{
Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
}
void
Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
{
debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
/* Make sure we are open, not closing, and not writing */
assert(fd_table[conn->fd].flags.open);
assert(!fd_table[conn->fd].closing());
Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
assert(!ccb->active());
fd_table[conn->fd].writeStart = squid_curtime;
ccb->conn = conn;
/* Queue the write */
ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
ccb->selectOrQueueWrite();
}
/** Write to FD.
* This function is used by the lowest level of IO loop which only has access to FD numbers.
* We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
* Once the write has been concluded we schedule the waiting call with success/fail results.
*/
void
Comm::HandleWrite(int fd, void *data)
{
Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
int len = 0;
int nleft;
- assert(state->conn != NULL && state->conn->fd == fd);
+ assert(state->conn != NULL);
+ assert(state->conn->fd == fd);
PROF_start(commHandleWrite);
debugs(5, 5, HERE << state->conn << ": off " <<
(long int) state->offset << ", sz " << (long int) state->size << ".");
nleft = state->size - state->offset;
#if USE_DELAY_POOLS
- ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
- if (clientInfo && !clientInfo->writeLimitingActive)
- clientInfo = NULL; // we only care about quota limits here
-
- if (clientInfo) {
- assert(clientInfo->selectWaiting);
- clientInfo->selectWaiting = false;
-
- assert(clientInfo->hasQueue());
- assert(clientInfo->quotaPeekFd() == fd);
- clientInfo->quotaDequeue(); // we will write or requeue below
-
- if (nleft > 0) {
- const int quota = clientInfo->quotaForDequed();
- if (!quota) { // if no write quota left, queue this fd
- state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
- clientInfo->kickQuotaQueue();
- PROF_stop(commHandleWrite);
- return;
- }
-
- const int nleft_corrected = min(nleft, quota);
- if (nleft != nleft_corrected) {
- debugs(5, 5, HERE << state->conn << " writes only " <<
- nleft_corrected << " out of " << nleft);
- nleft = nleft_corrected;
- }
-
+ BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]);
+ if (bucket) {
+ assert(bucket->selectWaiting);
+ bucket->selectWaiting = false;
+ if (nleft > 0 && !bucket->applyQuota(nleft, state)) {
+ PROF_stop(commHandleWrite);
+ return;
}
}
#endif /* USE_DELAY_POOLS */
/* actually WRITE data */
int xerrno = errno = 0;
len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
xerrno = errno;
debugs(5, 5, HERE << "write() returns " << len);
#if USE_DELAY_POOLS
- if (clientInfo) {
- if (len > 0) {
- /* we wrote data - drain them from bucket */
- clientInfo->bucketSize -= len;
- if (clientInfo->bucketSize < 0.0) {
- debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
- clientInfo->bucketSize = 0;
- }
- }
-
- // even if we wrote nothing, we were served; give others a chance
- clientInfo->kickQuotaQueue();
+ if (bucket) {
+ /* we wrote data - drain them from bucket */
+ bucket->reduceBucket(len);
}
#endif /* USE_DELAY_POOLS */
fd_bytes(fd, len, FD_WRITE);
++statCounter.syscalls.sock.writes;
// After each successful partial write,
// reset fde::writeStart to the current time.
fd_table[fd].writeStart = squid_curtime;
if (len == 0) {
/* Note we even call write if nleft == 0 */
/* We're done */
if (nleft != 0)
debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, 0);
} else if (len < 0) {
/* An error */
if (fd_table[fd].flags.socket_eof) {
debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
} else if (ignoreErrno(xerrno)) {
debugs(50, 9, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
state->selectOrQueueWrite();
} else {
debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
}
} else {
/* A successful write, continue */
=== modified file 'src/comm/Write.h'
--- src/comm/Write.h 2017-01-01 00:12:22 +0000
+++ src/comm/Write.h 2017-01-30 13:46:37 +0000
@@ -7,37 +7,34 @@
*/
#ifndef _SQUID_COMM_IOWRITE_H
#define _SQUID_COMM_IOWRITE_H
#include "base/AsyncCall.h"
#include "comm/forward.h"
#include "mem/forward.h"
class MemBuf;
namespace Comm
{
/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
*
* free_func is used to free the passed buffer when the write has completed.
*/
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
*/
void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
/// Cancel the write pending on FD. No action if none pending.
void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
-// callback handler to process an FD which is available for writing.
-extern PF HandleWrite;
-
} // namespace Comm
#endif /* _SQUID_COMM_IOWRITE_H */
=== modified file 'src/comm/forward.h'
--- src/comm/forward.h 2017-01-01 00:12:22 +0000
+++ src/comm/forward.h 2017-01-30 13:46:37 +0000
@@ -1,36 +1,42 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef _SQUID_COMM_FORWARD_H
#define _SQUID_COMM_FORWARD_H
#include "base/RefCount.h"
#include <vector>
+/// legacy CBDATA callback functions ABI definition for read or write I/O events
+/// \deprecated use CommCalls API instead where possible
+typedef void PF(int, void *);
+
/// Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets.
namespace Comm
{
class Connection;
class ConnOpener;
typedef RefCount<Comm::Connection> ConnectionPointer;
typedef std::vector<Comm::ConnectionPointer> ConnectionList;
bool IsConnOpen(const Comm::ConnectionPointer &conn);
+// callback handler to process an FD which is available for writing.
+PF HandleWrite;
+
+/// Mark an FD to be watched for its IO status.
+void SetSelect(int, unsigned int, PF *, void *, time_t);
+
}; // namespace Comm
-/// legacy CBDATA callback functions ABI definition for read or write I/O events
-/// \deprecated use CommCalls API instead where possible
-typedef void PF(int, void *);
-
#endif /* _SQUID_COMM_FORWARD_H */
=== modified file 'src/fde.h'
--- src/fde.h 2017-01-01 00:12:22 +0000
+++ src/fde.h 2017-01-30 13:46:42 +0000
@@ -1,49 +1,50 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_FDE_H
#define SQUID_FDE_H
#include "comm.h"
#include "defines.h"
#include "ip/Address.h"
#include "ip/forward.h"
#include "security/forward.h"
#include "typedefs.h" //DRCB, DWCB
#if USE_DELAY_POOLS
+#include "MessageBucket.h"
class ClientInfo;
#endif
/**
* READ_HANDLER functions return < 0 if, and only if, they fail with an error.
* On error, they must pass back an error code in 'errno'.
*/
typedef int READ_HANDLER(int, char *, int);
/**
* WRITE_HANDLER functions return < 0 if, and only if, they fail with an error.
* On error, they must pass back an error code in 'errno'.
*/
typedef int WRITE_HANDLER(int, const char *, int);
class dwrite_q;
class _fde_disk
{
public:
DWCB *wrt_handle;
void *wrt_handle_data;
dwrite_q *write_q;
dwrite_q *write_q_tail;
off_t offset;
_fde_disk() { memset(this, 0, sizeof(_fde_disk)); }
};
class fde
{
@@ -76,110 +77,112 @@
See also nfmarkFromServer. */
int sock_family;
char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */
char desc[FD_DESC_SZ];
struct _fde_flags {
bool open;
bool close_request; ///< true if file_ or comm_close has been called
bool write_daemon;
bool socket_eof;
bool nolinger;
bool nonblocking;
bool ipc;
bool called_connect;
bool nodelay;
bool close_on_exec;
bool read_pending;
//bool write_pending; //XXX seems not to be used
bool transparent;
} flags;
int64_t bytes_read;
int64_t bytes_written;
struct {
int uses; /* ie # req's over persistent conn */
} pconn;
#if USE_DELAY_POOLS
ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+ MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
#endif
unsigned epoll_state;
_fde_disk disk;
PF *read_handler;
void *read_data;
PF *write_handler;
void *write_data;
AsyncCall::Pointer timeoutHandler;
time_t timeout;
time_t writeStart;
void *lifetime_data;
AsyncCall::Pointer closeHandler;
AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds
READ_HANDLER *read_method;
WRITE_HANDLER *write_method;
Security::SessionPointer ssl;
Security::ContextPointer dynamicTlsContext; ///< cached and then freed when fd is closed
#if _SQUID_WINDOWS_
struct {
long handle;
} win32;
#endif
tos_t tosFromServer; /**< Stores the TOS flags of the packets from the remote server.
See FwdState::dispatch(). Note that this differs to
tosToServer in that this is the value we *receive* from the,
connection, whereas tosToServer is the value to set on packets
*leaving* Squid. */
unsigned int nfmarkFromServer; /**< Stores the Netfilter mark value of the connection from the remote
server. See FwdState::dispatch(). Note that this differs to
nfmarkToServer in that this is the value we *receive* from the,
connection, whereas nfmarkToServer is the value to set on packets
*leaving* Squid. */
/** Clear the fde class back to NULL equivalent. */
inline void clear() {
type = 0;
remote_port = 0;
local_addr.setEmpty();
tosToServer = '\0';
nfmarkToServer = 0;
sock_family = 0;
memset(ipaddr, '\0', MAX_IPSTRLEN);
memset(desc,'\0',FD_DESC_SZ);
memset(&flags,0,sizeof(_fde_flags));
bytes_read = 0;
bytes_written = 0;
pconn.uses = 0;
#if USE_DELAY_POOLS
clientInfo = NULL;
+ writeQuotaHandler = NULL;
#endif
epoll_state = 0;
read_handler = NULL;
read_data = NULL;
write_handler = NULL;
write_data = NULL;
timeoutHandler = NULL;
timeout = 0;
writeStart = 0;
lifetime_data = NULL;
closeHandler = NULL;
halfClosedReader = NULL;
read_method = NULL;
write_method = NULL;
ssl.reset();
dynamicTlsContext.reset();
#if _SQUID_WINDOWS_
win32.handle = (long)NULL;
#endif
tosFromServer = '\0';
nfmarkFromServer = 0;
}
};
#define fd_table fde::Table
int fdNFree(void);
#define FD_READ_METHOD(fd, buf, len) (*fd_table[fd].read_method)(fd, buf, len)
#define FD_WRITE_METHOD(fd, buf, len) (*fd_table[fd].write_method)(fd, buf, len)
=== modified file 'src/http/Stream.cc'
--- src/http/Stream.cc 2017-01-01 00:12:22 +0000
+++ src/http/Stream.cc 2017-01-30 13:46:49 +0000
@@ -1,45 +1,51 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#include "squid.h"
#include "client_side_request.h"
#include "http/Stream.h"
#include "HttpHdrContRange.h"
#include "HttpHeaderTools.h"
#include "Store.h"
#include "TimeOrTag.h"
+#if USE_DELAY_POOLS
+#include "acl/FilledChecklist.h"
+#include "ClientInfo.h"
+#include "fde.h"
+#include "MessageDelayPools.h"
+#endif
Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
clientConnection(aConn),
http(aReq),
reply(nullptr),
writtenToSocket(0),
mayUseConnection_(false),
connRegistered_(false)
{
assert(http != nullptr);
memset(reqbuf, '\0', sizeof (reqbuf));
flags.deferred = 0;
flags.parsed_ok = 0;
deferredparams.node = nullptr;
deferredparams.rep = nullptr;
}
Http::Stream::~Stream()
{
if (auto node = getTail()) {
if (auto ctx = dynamic_cast<Http::Stream *>(node->data.getRaw())) {
/* We are *always* the tail - prevent recursive free */
assert(this == ctx);
node->data = nullptr;
}
}
httpRequestFree(http);
}
void
@@ -256,60 +262,78 @@
}
void
Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
{
prepareReply(rep);
assert(rep);
MemBuf *mb = rep->pack();
// dump now, so we dont output any body.
debugs(11, 2, "HTTP Client " << clientConnection);
debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
/* Save length of headers for persistent conn checks */
http->out.headers_sz = mb->contentSize();
#if HEADERS_LOG
headersLog(0, 0, http->request->method, rep);
#endif
if (bodyData.data && bodyData.length) {
if (multipartRangeRequest())
packRange(bodyData, mb);
else if (http->request->flags.chunkedReply) {
packChunk(bodyData, *mb);
} else {
size_t length = lengthToSend(bodyData.range());
noteSentBodyBytes(length);
mb->append(bodyData.data, length);
}
}
+#if USE_DELAY_POOLS
+ for (const auto &pool: MessageDelayPools::Instance()->pools) {
+ if (pool->access) {
+ std::unique_ptr<ACLFilledChecklist> chl(clientAclChecklistCreate(pool->access, http));
+ chl->reply = rep;
+ HTTPMSGLOCK(chl->reply);
+ const allow_t answer = chl->fastCheck();
+ if (answer == ACCESS_ALLOWED) {
+ writeQuotaHandler = pool->createBucket();
+ fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
+ break;
+ } else {
+ debugs(83, 4, "Response delay pool " << pool->poolName <<
+ " skipped because ACL " << answer);
+ }
+ }
+ }
+#endif
getConn()->write(mb);
delete mb;
}
void
Http::Stream::sendBody(StoreIOBuffer bodyData)
{
if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
size_t length = lengthToSend(bodyData.range());
noteSentBodyBytes(length);
getConn()->write(bodyData.data, length);
return;
}
MemBuf mb;
mb.init();
if (multipartRangeRequest())
packRange(bodyData, &mb);
else
packChunk(bodyData, mb);
if (mb.contentSize())
getConn()->write(&mb);
else
writeComplete(0);
}
size_t
Http::Stream::lengthToSend(Range<int64_t> const &available) const
=== modified file 'src/http/Stream.h'
--- src/http/Stream.h 2017-01-01 00:12:22 +0000
+++ src/http/Stream.h 2017-01-30 13:46:49 +0000
@@ -1,44 +1,47 @@
/*
* Copyright (C) 1996-2017 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_SRC_HTTP_STREAM_H
#define SQUID_SRC_HTTP_STREAM_H
#include "http/forward.h"
#include "mem/forward.h"
#include "StoreIOBuffer.h"
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
class clientStreamNode;
class ClientHttpRequest;
namespace Http
{
/**
* The processing context for a single HTTP transaction (stream).
*
* A stream lifetime extends from directly after a request has been parsed
* off the client connection buffer, until the last byte of both request
* and reply payload (if any) have been written, or it is otherwise
* explicitly terminated.
*
* Streams self-register with the Http::Server Pipeline being managed by the
* Server for the connection on which the request was received.
*
* The socket level management and I/O is done by a Server which owns us.
* The scope of this objects control over a socket consists of the data
* buffer received from the Server with an initially unknown length.
* When that length is known it sets the end boundary of our access to the
* buffer.
*
* The individual processing actions are done by other Jobs which we start.
*
* When a stream is completed the finished() method needs to be called which
* will perform all cleanup and deregistration operations. If the reason for
* finishing is an error, then notifyIoError() needs to be called prior to
* the finished() method.
@@ -134,36 +137,39 @@
unsigned parsed_ok:1; ///< Was this parsed correctly?
} flags;
bool mayUseConnection() const {return mayUseConnection_;}
void mayUseConnection(bool aBool) {
mayUseConnection_ = aBool;
debugs(33, 3, "This " << this << " marked " << aBool);
}
class DeferredParams
{
public:
clientStreamNode *node;
HttpReply *rep;
StoreIOBuffer queuedBuffer;
};
DeferredParams deferredparams;
int64_t writtenToSocket;
private:
void prepareReply(HttpReply *);
void packChunk(const StoreIOBuffer &bodyData, MemBuf &);
void packRange(StoreIOBuffer const &, MemBuf *);
void doClose();
bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
bool connRegistered_;
+#if USE_DELAY_POOLS
+ MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
+#endif
};
} // namespace Http
#endif /* SQUID_SRC_HTTP_STREAM_H */
_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev