On 22.01.2017 22:52, Amos Jeffries wrote:

> as I understood it the existing delay pools design is that multiple
> pools can apply to traffic. In which case on each write() attempt the
> bucket with smallest available amount determins the write size and all
> buckets have the actually consumed amount removed.

I assume that you are mixing up "pools" and "buckets". A pool may have
several "cascading" buckets. The existing server pools have several
buckets for classes > 1.  We can configure several server delay pools,
but only one first 'matched' pool is assigned to the transaction. Then
the "smallest" bucket of the matched pool determines the final speed
limiting (as you correctly noted).  Following this logic, old "client"
and new "response" pools should be independent (as if they would
represent different client pool classes), i.e., finally we
should select only one "first matched" client-side delay pool. Within
this selected pool, we select bucket with the smallest level (individual
or aggregate) similarly as we do for server pools of classes 2
or 3). However we do not do this selection if old client pool matched,
because it has the only one bucket.

> ClientDelayConfig::parsePoolCount() please use emplace_back instead of
> push_back.

AFAIK, emplace_back() would have a benefit over push_back if there was
an object itself (thus avoiding copying the object). In this case a
pointer is inserted so the benefit is negligible. I would prefer using
old push_back() here.


> is there a specific reason this has to be a list of raw-pointers
> instead of a list of objects like it was?

This change was a consequence of another fix: ClientDelayPool::access member
was not destroyed on shutdown causing "indirectly lost" Valgrind error messages. ClientDelayPool class does not follow copy semantics(and probably should not) due to acl_access pointer, requiring by stl vector. So I had to use pointers instead.
However my latest checks with test-builds.sh showed another linker problems,
because SquidConfig got unnecessary dependency on ClientDelayConfig
destructor. Finally I had to re-work ClientDelayConfig module,
introducing new ClientDelayPools class (a storage for ClientDelayPool
objects) and making ClientDelayPool ref-countable.

>  which brings up a design contradiction: "why are there required
> 'options'?
> so the answer is that they should not be required, the value -1 in
> pools just means "no limit" or "infinity".
> - the for-loop checking for missing options should be removed and use
> of the "none" magic word as value for these options should be supported.

I agree that we should not require using all response_delay_pool
options.  For example, it would be useful to allow only "individual" or
"aggregate" options, e.g.:

#no aggregate limiting
response_delay_pool slowPool1  individual-restore=50000 \
                                  individual-maximum=100000

#no individual limiting, split aggregate bandwidth among clients
response_delay_pool slowPool2  aggregate-restore=100000  \
                                  aggregate-maximum=200000

No need for "none" world: just remove unneeded options (or specify -1
value). Using approach the existing server delay pools
adhere to ("none" for speed/maximum pair), we should not use "restore"
option without "maximum" for both "aggregate" and "individual" cases.
BTW, it is possible to specify "no limitation" at all for a pool
(i.e., no individual and aggregate limitation). The existing server
pools work similarly, allowing to specify "none" for all buckets
(without any warning message).

> The limitation configured with initial_fill_level= was previously
> configured through directive client_delay_initial_bucket_level.

"client_delay_initial_bucket_level" relates to only old client delay
pools.  I don't think we should reuse it in the new response pools: one
may need to configure both pools with different initial level parameter.


Thanks for the detailed review. I tried to address all other remarks,
renamed parameters according to the suggested terminology,
merged with latest v5 r15027 and re-attached the patch.


Eduard.

Added response delay pools feature for Squid-to-client speed limiting.

The feature restricts Squid-to-client bandwidth only.  It applies to
both cache hits and misses.

  * Rationale *

  This may be useful for specific response(s) bandwidth limiting.
  There are situations when doing this is hardly possible
  (or impossible) by means of netfilter/iptables operating with
  TCP/IP packets and IP addresses information for filtering. In other
  words, sometimes it is problematic to 'extract' a single response from
  TCP/IP data flow at system level. For example, a single Squid-to-client
  TCP connection can transmit multiple responses (persistent connections,
  pipelining or HTTP/2 connection multiplexing) or be encrypted
  (HTTPS proxy mode).

  * Description *

  When Squid starts delivering the final HTTP response to a client,
  Squid checks response_delay_pool_access rules (supporting fast ACLs
  only), in the order they were declared. The first rule with a
  matching ACL wins.  If (and only if) an "allow" rule won, Squid
  assigns the response to the corresponding named delay pool.

  If a response is assigned to a delay pool, the response becomes
  subject to the configured bucket and aggregate bandwidth limits of
  that pool, similar to the current "class 2" server-side delay pools,
  but with a brand new, dedicated "individual" filled bucket assigned to
  the matched response.

  The new feature serves the same purpose as the existing client-side
  pools: both features limit Squid-to-client bandwidth. Their common
  interface was placed into a new base BandwidthBucket class.  The
  difference is that client-side pools do not aggregate clients and
  always use one bucket per client IP. It is possible that a response
  becomes a subject of both these pools. In such situations only matched
  response delay pool will be used for Squid-to-client speed limiting.

  * Limitations *

  The accurate SMP support (with the aggregate bucket shared among
  workers) is outside this patch scope. In SMP configurations,
  Squid should automatically divide the aggregate_speed_limit and
  max_aggregate_size values among the configured number of Squid
  workers. 

  * Also: *

  Fixed ClientDelayConfig which did not perform cleanup on
  destruction, causing memory problems detected by Valgrind. It was not
  possible to fix this with minimal changes because of linker problems
  with SquidConfig while checking with test-builds.sh. So I had
  to refactor ClientDelayConfig module, separating configuration code
  (old ClientDelayConfig class) from configured data (a new
  ClientDelayPools class) and minimizing dependencies with SquidConfig.

=== added file 'src/BandwidthBucket.cc'
--- src/BandwidthBucket.cc	1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.cc	2017-01-30 13:45:42 +0000
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "ClientInfo.h"
+#include "comm/Connection.h"
+#include "Debug.h"
+#include "fde.h"
+
+BandwidthBucket::BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit) :
+    bucketLevel( sizeLimit * (initialLevelPercent / 100.0)),
+    selectWaiting(false),
+    writeSpeedLimit(speed),
+    bucketSizeLimit(sizeLimit)
+{
+    getCurrentTime();
+    /* put current time to have something sensible here */
+    prevTime = current_dtime;
+}
+
+void
+BandwidthBucket::refillBucket()
+{
+    if (noLimit())
+        return;
+    // all these times are in seconds, with double precision
+    const double currTime = current_dtime;
+    const double timePassed = currTime - prevTime;
+
+    // Calculate allowance for the time passed. Use double to avoid
+    // accumulating rounding errors for small intervals. For example, always
+    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+    const double gain = timePassed * writeSpeedLimit;
+
+    // to further combat error accumulation during micro updates,
+    // quit before updating time if we cannot add at least one byte
+    if (gain < 1.0)
+        return;
+
+    prevTime = currTime;
+
+    // for "first" connections, drain initial fat before refilling but keep
+    // updating prevTime to avoid bursts after the fat is gone
+    if (bucketLevel > bucketSizeLimit) {
+        debugs(77, 4, "not refilling while draining initial fat");
+        return;
+    }
+
+    bucketLevel += gain;
+
+    // obey quota limits
+    if (bucketLevel > bucketSizeLimit)
+        bucketLevel = bucketSizeLimit;
+}
+
+bool
+BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    const int q = quota();
+    if (!q)
+        return false;
+    else if (q < 0)
+        return true;
+    const int nleft_corrected = min(nleft, q);
+    if (nleft != nleft_corrected) {
+        debugs(77, 5, state->conn << " writes only " <<
+               nleft_corrected << " out of " << nleft);
+        nleft = nleft_corrected;
+    }
+    return true;
+}
+
+void
+BandwidthBucket::reduceBucket(const int len)
+{
+    if (len <= 0 || noLimit())
+        return;
+    bucketLevel -= len;
+    if (bucketLevel < 0.0) {
+        debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen
+        bucketLevel = 0;
+    }
+}
+
+BandwidthBucket *
+BandwidthBucket::SelectBucket(fde *f)
+{
+    BandwidthBucket *bucket = f->writeQuotaHandler.getRaw();
+    if (!bucket) {
+        ClientInfo *clientInfo = f->clientInfo;
+        if (clientInfo && clientInfo->writeLimitingActive)
+            bucket = clientInfo;
+    }
+    return bucket;
+}
+
+#endif /* USE_DELAY_POOLS */
+

=== added file 'src/BandwidthBucket.h'
--- src/BandwidthBucket.h	1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.h	2017-01-30 13:45:42 +0000
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef BANDWIDTHBUCKET_H
+#define BANDWIDTHBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "comm/IoCallback.h"
+
+class fde;
+
+/// Base class for Squid-to-client bandwidth limiting
+class BandwidthBucket
+{
+public:
+    BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit);
+    virtual ~BandwidthBucket() {}
+
+    static BandwidthBucket *SelectBucket(fde *f);
+
+    /// \returns the number of bytes this bucket allows to write,
+    /// also considering aggregates, if any. Negative quota means
+    /// no limitations by this bucket.
+    virtual int quota() = 0;
+    /// Adjusts nleft to not exceed the current bucket quota value,
+    /// if needed.
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state);
+    /// Will plan another write call.
+    virtual void scheduleWrite(Comm::IoCallback *state) = 0;
+    /// Performs cleanup when the related file descriptor becomes closed.
+    virtual void onFdClosed() { selectWaiting = false; }
+    /// Decreases the bucket level.
+    virtual void reduceBucket(const int len);
+    /// Whether this bucket will not do bandwidth limiting.
+    bool noLimit() const { return writeSpeedLimit < 0; }
+
+protected:
+    /// Increases the bucket level with the writeSpeedLimit speed.
+    void refillBucket();
+
+public:
+    double bucketLevel; ///< how much can be written now
+    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+
+protected:
+    double prevTime; ///< previous time when we checked
+    double writeSpeedLimit; ///< Write speed limit in bytes per second.
+    double bucketSizeLimit; ///< maximum bucket size
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+

=== modified file 'src/ClientDelayConfig.cc'
--- src/ClientDelayConfig.cc	2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.cc	2017-01-30 13:45:43 +0000
@@ -1,103 +1,113 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "acl/Acl.h"
 #include "acl/Gadgets.h"
 #include "ClientDelayConfig.h"
 #include "ConfigParser.h"
 #include "Parsing.h"
 #include "Store.h"
 
+ClientDelayPool::~ClientDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
 void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const
 {
     LOCAL_ARRAY(char, nom, 32);
     snprintf(nom, 32, "client_delay_access %d", poolNumberMinusOne + 1);
     dump_acl_access(entry, nom, access);
     storeAppendPrintf(entry, "client_delay_parameters %d %d %" PRId64 "\n", poolNumberMinusOne + 1, rate,highwatermark);
     storeAppendPrintf(entry, "\n");
 }
 
+ClientDelayPools *
+ClientDelayPools::Instance()
+{
+    static ClientDelayPools pools;
+    return &pools;
+}
+
+ClientDelayPools::~ClientDelayPools()
+{
+    pools.clear();
+}
+
 void
 ClientDelayConfig::finalize()
 {
-    for (unsigned int i = 0; i < pools.size(); ++i) {
+    for (unsigned int i = 0; i < pools().size(); ++i) {
         /* pools require explicit 'allow' to assign a client into them */
-        if (!pools[i].access) {
-            debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) <<
+        if (!pool(i).access) {
+            debugs(77, DBG_IMPORTANT, "WARNING: client_delay_pool #" << (i+1) <<
                    " has no client_delay_access configured. " <<
                    "No client will ever use it.");
         }
     }
 }
 
-void ClientDelayConfig::freePoolCount()
-{
-    pools.clear();
-}
-
 void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
 {
-    if (pools.size()) {
-        storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size());
-        for (unsigned int i = 0; i < pools.size(); ++i)
-            pools[i].dump(entry, i);
+    const auto &pools_ = ClientDelayPools::Instance()->pools;
+    if (pools_.size()) {
+        storeAppendPrintf(entry, "%s %d\n", name, static_cast<int>(pools_.size()));
+        for (unsigned int i = 0; i < pools_.size(); ++i)
+            pools_[i]->dump(entry, i);
     }
 }
 
+void
+ClientDelayConfig::freePools()
+{
+    pools().clear();
+}
+
 void ClientDelayConfig::parsePoolCount()
 {
-    if (pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config");
-        clean();
+    if (pools().size()) {
+        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, " <<
+               "aborting all previous client_delay_pools config");
+        freePools();
     }
     unsigned short pools_;
     ConfigParser::ParseUShort(&pools_);
-    for (int i = 0; i < pools_; ++i) {
-        pools.push_back(ClientDelayPool());
-    }
+    for (int i = 0; i < pools_; ++i)
+        pools().push_back(new ClientDelayPool());
 }
 
 void ClientDelayConfig::parsePoolRates()
 {
-    unsigned short pool;
-    ConfigParser::ParseUShort(&pool);
-
-    if (pool < 1 || pool > pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
-        return;
+    if (unsigned short poolId = parsePoolId()) {
+        --poolId;
+        pool(poolId).rate = GetInteger();
+        pool(poolId).highwatermark = GetInteger64();
     }
-
-    --pool;
-
-    pools[pool].rate = GetInteger();
-    pools[pool].highwatermark = GetInteger64();
 }
 
 void ClientDelayConfig::parsePoolAccess(ConfigParser &parser)
 {
-    unsigned short pool;
-
-    ConfigParser::ParseUShort(&pool);
-
-    if (pool < 1 || pool > pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
-        return;
-    }
-
-    --pool;
-    aclParseAccessLine("client_delay_access", parser, &pools[pool].access);
+    if (const unsigned short poolId = parsePoolId())
+        aclParseAccessLine("client_delay_access", parser, &(pool(poolId-1).access));
 }
 
-void ClientDelayConfig::clean()
+unsigned short
+ClientDelayConfig::parsePoolId()
 {
-    for (unsigned int i = 0; i < pools.size(); ++i) {
-        aclDestroyAccessList(&pools[i].access);
+    unsigned short poolId = 0;
+    ConfigParser::ParseUShort(&poolId);
+    if (poolId < 1 || poolId > pools().size()) {
+        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " <<
+               poolId << " not in 1 .. " << pools().size());
+        return 0;
     }
+    return poolId;
 }
 

=== modified file 'src/ClientDelayConfig.h'
--- src/ClientDelayConfig.h	2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.h	2017-01-30 13:45:43 +0000
@@ -1,60 +1,83 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_CLIENTDELAYCONFIG_H
 #define SQUID_CLIENTDELAYCONFIG_H
 
 #include "acl/forward.h"
+#include "base/RefCount.h"
 
 #include <vector>
 
 class StoreEntry;
 class ConfigParser;
 
 /// \ingroup DelayPoolsAPI
 
 /* represents one client write limiting delay 'pool' */
-class ClientDelayPool
+class ClientDelayPool : public RefCountable
 {
 public:
+    typedef RefCount<ClientDelayPool> Pointer;
+
     ClientDelayPool()
-        :   access(NULL), rate(0), highwatermark(0) {}
+        :   access(nullptr), rate(0), highwatermark(0) {}
+    ~ClientDelayPool();
+    ClientDelayPool(const ClientDelayPool &) = delete;
+    ClientDelayPool &operator=(const ClientDelayPool &) = delete;
+
     void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const;
     acl_access *access;
     int rate;
     int64_t highwatermark;
 };
 
-typedef std::vector<ClientDelayPool> ClientDelayPools;
+class ClientDelayPools
+{
+public:
+    ClientDelayPools(const ClientDelayPools &) = delete;
+    ClientDelayPools &operator=(const ClientDelayPools &) = delete;
+    static ClientDelayPools *Instance();
+
+    std::vector<ClientDelayPool::Pointer> pools;
+private:
+    ClientDelayPools() {}
+    ~ClientDelayPools();
+};
 
 /* represents configuration of client write limiting delay pools */
 class ClientDelayConfig
 {
 public:
     ClientDelayConfig()
         :   initial(50) {}
-    void freePoolCount();
+    ClientDelayConfig(const ClientDelayConfig &) = delete;
+    ClientDelayConfig &operator=(const ClientDelayConfig &) = delete;
+
+    void freePools();
     void dumpPoolCount(StoreEntry * entry, const char *name) const;
     /* parsing of client_delay_pools - number of pools */
     void parsePoolCount();
     /* parsing of client_delay_parameters lines */
     void parsePoolRates();
     /* parsing client_delay_access lines */
     void parsePoolAccess(ConfigParser &parser);
 
     void finalize(); ///< checks pools configuration
 
     /* initial bucket level, how fill bucket at startup */
     unsigned short initial;
-    ClientDelayPools pools;
+
 private:
-    void clean();
+    unsigned short parsePoolId();
+    std::vector<ClientDelayPool::Pointer> &pools() { return ClientDelayPools::Instance()->pools; }
+    ClientDelayPool &pool(const int i) { return *(ClientDelayPools::Instance()->pools.at(i)); }
 };
 
 #endif // SQUID_CLIENTDELAYCONFIG_H
 

=== modified file 'src/ClientInfo.h'
--- src/ClientInfo.h	2017-01-01 00:12:22 +0000
+++ src/ClientInfo.h	2017-01-30 13:45:43 +0000
@@ -1,113 +1,117 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID__SRC_CLIENTINFO_H
 #define SQUID__SRC_CLIENTINFO_H
 
+#if USE_DELAY_POOLS
+#include "BandwidthBucket.h"
+#endif
 #include "base/ByteCounter.h"
 #include "cbdata.h"
 #include "enums.h"
 #include "hash.h"
 #include "ip/Address.h"
 #include "LogTags.h"
 #include "mem/forward.h"
 #include "typedefs.h"
 
 #include <deque>
 
 #if USE_DELAY_POOLS
 class CommQuotaQueue;
 #endif
 
-class ClientInfo
+class ClientInfo : public hash_link
+#if USE_DELAY_POOLS
+    , public BandwidthBucket
+#endif
 {
     MEMPROXY_CLASS(ClientInfo);
 
 public:
     explicit ClientInfo(const Ip::Address &);
     ~ClientInfo();
 
-    hash_link hash;             /* must be first */
-
     Ip::Address addr;
 
     struct Protocol {
         Protocol() : n_requests(0) {
             memset(result_hist, 0, sizeof(result_hist));
         }
 
         int result_hist[LOG_TYPE_MAX];
         int n_requests;
         ByteCounter kbytes_in;
         ByteCounter kbytes_out;
         ByteCounter hit_kbytes_out;
     } Http, Icp;
 
     struct Cutoff {
         Cutoff() : time(0), n_req(0), n_denied(0) {}
 
         time_t time;
         int n_req;
         int n_denied;
     } cutoff;
     int n_established;          /* number of current established connections */
     time_t last_seen;
 #if USE_DELAY_POOLS
-    double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
-    double prevTime; ///< previous time when we checked
-    double bucketSize; ///< how much can be written now
-    double bucketSizeLimit;  ///< maximum bucket size
     bool writeLimitingActive; ///< Is write limiter active
     bool firstTimeConnection;///< is this first time connection for this client
 
     CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
     int rationedQuota; ///< precomputed quota preserving fairness among clients
     int rationedCount; ///< number of clients that will receive rationedQuota
-    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
     bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
 
     // all those functions access Comm fd_table and are defined in comm.cc
     bool hasQueue() const;  ///< whether any clients are waiting for write quota
     bool hasQueue(const CommQuotaQueue*) const;  ///< has a given queue
     unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
     int quotaPeekFd() const; ///< retuns the next fd reservation
     unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
     void quotaDequeue(); ///< pops queue head from queue
     void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
-    int quotaForDequed(); ///< allocate quota for a just dequeued client
-    void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+    /* BandwidthBucket API */
+    virtual int quota() override; ///< allocate quota for a just dequeued client
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void onFdClosed() override;
+    virtual void reduceBucket(int len) override;
 
     void quotaDumpQueue(); ///< dumps quota queue for debugging
 
     /**
      * Configure client write limiting (note:"client" here means - IP). It is called
      * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
      * computed, using the configured maximum bucket vavlue and configured initial
      * bucket value(50% by default).
      *
      * \param  writeSpeedLimit is speed limit configured in config for this pool
      * \param  initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
      *  \param highWatermark is maximum bucket value
      */
     void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
 #endif /* USE_DELAY_POOLS */
 };
 
 #if USE_DELAY_POOLS
 // a queue of Comm clients waiting for I/O quota controlled by delay pools
 class CommQuotaQueue
 {
     CBDATA_CLASS(CommQuotaQueue);
 
 public:
     CommQuotaQueue(ClientInfo *info);
     ~CommQuotaQueue();
 
     bool empty() const { return fds.empty(); }
     size_t size() const { return fds.size(); }
     int front() const { return fds.front(); }

=== modified file 'src/Makefile.am'
--- src/Makefile.am	2017-01-24 19:49:32 +0000
+++ src/Makefile.am	2017-01-30 13:45:53 +0000
@@ -61,81 +61,87 @@
 	snmp_core.cc \
 	snmp_agent.h \
 	snmp_agent.cc
 if ENABLE_SNMP
 SNMP_SOURCE = $(SNMP_ALL_SOURCE)
 SUBDIRS += snmp
 SNMP_LIBS = snmp/libsnmp.la $(SNMPLIB)
 else
 SNMP_SOURCE =
 endif
 DIST_SUBDIRS += snmp
 
 if ENABLE_ADAPTATION
 SUBDIRS += adaptation
 endif
 DIST_SUBDIRS += adaptation
 
 if ENABLE_ESI
 SUBDIRS += esi
 ESI_LIBS = \
 	esi/libesi.la \
 	$(top_builddir)/lib/libTrie/libTrie.a \
 	$(XMLLIB) \
 	$(EXPATLIB)
 else
 ESI_LIBS =
 endif
 DIST_SUBDIRS += esi
 
 DELAY_POOL_ALL_SOURCE = \
+	BandwidthBucket.cc \
+	BandwidthBucket.h \
 	CommonPool.h \
 	CompositePoolNode.h \
 	delay_pools.cc \
 	DelayId.cc \
 	DelayId.h \
 	DelayIdComposite.h \
 	DelayBucket.cc \
 	DelayBucket.h \
 	DelayConfig.cc \
 	DelayConfig.h \
 	DelayPool.cc \
 	DelayPool.h \
 	DelayPools.h \
 	DelaySpec.cc \
 	DelaySpec.h \
 	DelayTagged.cc \
 	DelayTagged.h \
 	DelayUser.cc \
 	DelayUser.h \
 	DelayVector.cc \
 	DelayVector.h \
+	MessageBucket.cc \
+	MessageBucket.h \
+	MessageDelayPools.h \
+	MessageDelayPools.cc \
 	NullDelayId.h \
 	ClientDelayConfig.cc \
 	ClientDelayConfig.h
 
 if ENABLE_DELAY_POOLS
 DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
 else
 DELAY_POOL_SOURCE =
 endif
 
 if ENABLE_XPROF_STATS
 XPROF_STATS_SOURCE = ProfStats.cc
 else
 XPROF_STATS_SOURCE =
 endif
 
 if ENABLE_HTCP
 HTCPSOURCE = htcp.cc htcp.h
 endif
 
 if ENABLE_LEAKFINDER
 LEAKFINDERSOURCE =  LeakFinder.cc
 else
 LEAKFINDERSOURCE =
 endif
 
 if ENABLE_UNLINKD
 UNLINKDSOURCE = unlinkd.h unlinkd.cc
 UNLINKD = unlinkd
 else

=== added file 'src/MessageBucket.cc'
--- src/MessageBucket.cc	1970-01-01 00:00:00 +0000
+++ src/MessageBucket.cc	2017-01-30 13:45:54 +0000
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "comm/Connection.h"
+#include "DelayPools.h"
+#include "fde.h"
+#include "MessageBucket.h"
+
+MessageBucket::MessageBucket(const int speed, const int initialLevelPercent,
+                             const double sizeLimit, MessageDelayPool::Pointer pool) :
+    BandwidthBucket(speed, initialLevelPercent, sizeLimit),
+    theAggregate(pool) {}
+
+int
+MessageBucket::quota()
+{
+    refillBucket();
+    theAggregate->refillBucket();
+    if (theAggregate->noLimit())
+        return bucketLevel;
+    else if (noLimit())
+        return theAggregate->level();
+    else
+        return min(bucketLevel, static_cast<double>(theAggregate->level()));
+}
+
+void
+MessageBucket::reduceBucket(int len)
+{
+    BandwidthBucket::reduceBucket(len);
+    theAggregate->bytesIn(len);
+}
+
+void
+MessageBucket::scheduleWrite(Comm::IoCallback *state)
+{
+    fde *F = &fd_table[state->conn->fd];
+    if (!F->writeQuotaHandler->selectWaiting) {
+        F->writeQuotaHandler->selectWaiting = true;
+        // message delay pools limit this write; see checkTimeouts()
+        SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0);
+    }
+}
+
+#endif /* USE_DELAY_POOLS */
+

=== added file 'src/MessageBucket.h'
--- src/MessageBucket.h	1970-01-01 00:00:00 +0000
+++ src/MessageBucket.h	2017-01-30 13:45:54 +0000
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEBUCKET_H
+#define MESSAGEBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "base/RefCount.h"
+#include "comm/forward.h"
+#include "MessageDelayPools.h"
+
+/// Limits Squid-to-client bandwidth for each matching response
+class MessageBucket : public RefCountable, public BandwidthBucket
+{
+    MEMPROXY_CLASS(MessageBucket);
+
+public:
+    typedef RefCount<MessageBucket> Pointer;
+
+    MessageBucket(const int speed, const int initialLevelPercent, const double sizeLimit, MessageDelayPool::Pointer pool);
+
+    /* BandwidthBucket API */
+    virtual int quota() override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void reduceBucket(int len);
+
+private:
+    MessageDelayPool::Pointer theAggregate;
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+

=== added file 'src/MessageDelayPools.cc'
--- src/MessageDelayPools.cc	1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.cc	2017-01-30 13:45:54 +0000
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "acl/Gadgets.h"
+#include "cache_cf.h"
+#include "ConfigParser.h"
+#include "DelaySpec.h"
+#include "event.h"
+#include "MessageBucket.h"
+#include "MessageDelayPools.h"
+#include "Parsing.h"
+#include "SquidTime.h"
+#include "Store.h"
+
+#include <algorithm>
+#include <map>
+
+MessageDelayPools::~MessageDelayPools()
+{
+    freePools();
+}
+
+MessageDelayPools *
+MessageDelayPools::Instance()
+{
+    static MessageDelayPools pools;
+    return &pools;
+}
+
+MessageDelayPool::Pointer
+MessageDelayPools::pool(const SBuf &name)
+{
+    auto it = std::find_if(pools.begin(), pools.end(),
+    [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; });
+    return it == pools.end() ? 0 : *it;
+}
+
+void
+MessageDelayPools::add(MessageDelayPool *p)
+{
+    const auto it = std::find_if(pools.begin(), pools.end(),
+    [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; });
+    if (it != pools.end()) {
+        debugs(3, DBG_CRITICAL, "WARNING: Ignoring duplicate " << p->poolName << " response delay pool");
+        return;
+    }
+    pools.push_back(p);
+}
+
+void
+MessageDelayPools::freePools()
+{
+    pools.clear();
+}
+
+MessageDelayPool::MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+                                   int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent):
+    access(0),
+    poolName(name),
+    individualRestore(bucketSpeed),
+    individualMaximum(bucketSize),
+    aggregateRestore(aggregateSpeed),
+    aggregateMaximum(aggregateSize),
+    initialBucketLevel(initialBucketPercent),
+    lastUpdate(squid_curtime)
+{
+    theBucket.level() = aggregateMaximum;
+}
+
+MessageDelayPool::~MessageDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
+void
+MessageDelayPool::refillBucket()
+{
+    if (noLimit())
+        return;
+    const int incr = squid_curtime - lastUpdate;
+    if (incr >= 1) {
+        lastUpdate = squid_curtime;
+        DelaySpec spec;
+        spec.restore_bps = aggregateRestore;
+        spec.max_bytes = aggregateMaximum;
+        theBucket.update(spec, incr);
+    }
+}
+
+void
+MessageDelayPool::dump(StoreEntry *entry) const
+{
+    SBuf name("response_delay_pool_access ");
+    name.append(poolName);
+    dump_acl_access(entry, name.c_str(), access);
+    storeAppendPrintf(entry, "response_delay_pool parameters %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %d\n",
+                      individualRestore, individualMaximum, aggregateRestore, aggregateMaximum, initialBucketLevel);
+    storeAppendPrintf(entry, "\n");
+}
+
+MessageBucket::Pointer
+MessageDelayPool::createBucket()
+{
+    return new MessageBucket(individualRestore, initialBucketLevel, individualMaximum, this);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPool()
+{
+    static const SBuf bucketSpeedLimit("individual-restore");
+    static const SBuf maxBucketSize("individual-maximum");
+    static const SBuf aggregateSpeedLimit("aggregate-restore");
+    static const SBuf maxAggregateSize("aggregate-maximum");
+    static const SBuf initialBucketPercent("initial-bucket-level");
+
+    static std::map<SBuf, int64_t> params;
+    params[bucketSpeedLimit] = -1;
+    params[maxBucketSize] = -1;
+    params[aggregateSpeedLimit] = -1;
+    params[maxAggregateSize] = -1;
+    params[initialBucketPercent] = 50;
+
+    const SBuf name(ConfigParser::NextToken());
+    if (name.isEmpty()) {
+        debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool missing required \"name\" parameter.");
+        self_destruct();
+    }
+
+    char *key = nullptr;
+    char *value = nullptr;
+    while (ConfigParser::NextKvPair(key, value)) {
+        if (!value) {
+            debugs(3, DBG_CRITICAL, "FATAL: '" << key << "' option missing value");
+            self_destruct();
+        }
+        auto it = params.find(SBuf(key));
+        if (it == params.end()) {
+            debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool unknown option '" << key << "'");
+            self_destruct();
+        }
+        it->second = (it->first == initialBucketPercent) ? xatos(value) : xatoll(value, 10);
+    }
+
+    const char *fatalMsg = nullptr;
+    if ((params[bucketSpeedLimit] < 0) != (params[maxBucketSize] < 0))
+        fatalMsg = "'individual-restore' and 'individual-maximum'";
+    else if ((params[aggregateSpeedLimit] < 0) != (params[maxAggregateSize] < 0))
+        fatalMsg = "'aggregate-restore' and 'aggregate-maximum'";
+
+    if (fatalMsg) {
+        debugs(3, DBG_CRITICAL, "FATAL: must use " << fatalMsg << " options in conjunction");
+        self_destruct();
+    }
+
+    MessageDelayPool *pool = new MessageDelayPool(name,
+            params[bucketSpeedLimit],
+            params[maxBucketSize],
+            params[aggregateSpeedLimit],
+            params[maxAggregateSize],
+            static_cast<uint16_t>(params[initialBucketPercent])
+                                                 );
+    MessageDelayPools::Instance()->add(pool);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPoolAccess() {
+    const char *token = ConfigParser::NextToken();
+    if (!token) {
+        debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing");
+        return;
+    }
+    MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token));
+    static ConfigParser parser;
+    if (pool)
+        aclParseAccessLine("response_delay_pool_access", parser, &pool->access);
+}
+
+void
+MessageDelayConfig::freePools()
+{
+    MessageDelayPools::Instance()->freePools();
+}
+
+void
+MessageDelayConfig::dumpResponseDelayPoolParameters(StoreEntry *entry, const char *name)
+{
+    auto &pools = MessageDelayPools::Instance()->pools;
+    for (auto pool: pools)
+        pool->dump(entry);
+}
+
+#endif
+

=== added file 'src/MessageDelayPools.h'
--- src/MessageDelayPools.h	1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.h	2017-01-30 13:45:54 +0000
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEDELAYPOOLS_H
+#define MESSAGEDELAYPOOLS_H
+
+#if USE_DELAY_POOLS
+
+#include "acl/Acl.h"
+#include "base/RefCount.h"
+#include "DelayBucket.h"
+#include "DelayPools.h"
+
+class MessageBucket;
+typedef RefCount<MessageBucket> MessageBucketPointer;
+
+/// \ingroup DelayPoolsAPI
+/// Represents one 'response' delay pool, creates individual response
+/// buckets and performes aggregate limiting for them
+class MessageDelayPool : public RefCountable
+{
+public:
+    typedef RefCount<MessageDelayPool> Pointer;
+
+    MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+                     int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent);
+    ~MessageDelayPool();
+    MessageDelayPool(const MessageDelayPool &) = delete;
+    MessageDelayPool &operator=(const MessageDelayPool &) = delete;
+
+    /// Increases the aggregate bucket level with the aggregateRestore speed.
+    void refillBucket();
+    /// decreases the aggregate level
+    void bytesIn(int qty) { if (!noLimit()) theBucket.bytesIn(qty); }
+    /// current aggregate level
+    int level() { return theBucket.level(); }
+    /// creates an individual response bucket
+    MessageBucketPointer createBucket();
+    /// whether the aggregate bucket has no limit
+    bool noLimit () const { return aggregateRestore < 0; }
+
+    void dump (StoreEntry * entry) const;
+
+    acl_access *access;
+    /// the response delay pool name
+    SBuf poolName;
+    /// the speed limit of an individual bucket (bytes/s)
+    int64_t individualRestore;
+    /// the maximum size of an individual bucket
+    int64_t individualMaximum;
+    /// the speed limit of the aggregate bucket (bytes/s)
+    int64_t aggregateRestore;
+    /// the maximum size of the aggregate bucket
+    int64_t aggregateMaximum;
+    /// the initial bucket size as a percentage of individualMaximum
+    uint16_t initialBucketLevel;
+    /// the aggregate bucket
+    DelayBucket theBucket;
+
+private:
+    /// Time the aggregate bucket level was last refilled.
+    time_t lastUpdate;
+};
+
+/// \ingroup DelayPoolsAPI
+/// represents all configured 'response' delay pools
+class MessageDelayPools
+{
+public:
+    MessageDelayPools(const MessageDelayPools &) = delete;
+    MessageDelayPools &operator=(const MessageDelayPools &) = delete;
+
+    static MessageDelayPools *Instance();
+
+    /// returns a MessageDelayPool with a given name or null otherwise
+    MessageDelayPool::Pointer pool(const SBuf &name);
+    /// appends a single MessageDelayPool, created during configuration
+    void add(MessageDelayPool *pool);
+    /// memory cleanup, performing during reconfiguration
+    void freePools();
+
+    std::vector<MessageDelayPool::Pointer> pools;
+
+private:
+    MessageDelayPools() {}
+    ~MessageDelayPools();
+    void Stats() { } // TODO
+};
+
+/// represents configuration for response delay pools
+class MessageDelayConfig
+{
+public:
+    void parseResponseDelayPool();
+    void dumpResponseDelayPoolParameters(StoreEntry *e, const char *name);
+    void parseResponseDelayPoolAccess();
+    void freePools();
+};
+
+#define free_response_delay_pool_access(X)
+#define dump_response_delay_pool_access(X, Y, Z)
+
+inline void
+free_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->freePools();
+}
+
+inline void
+dump_response_delay_pool_parameters(StoreEntry *entry, const char *name, MessageDelayConfig &cfg)
+{
+    cfg.dumpResponseDelayPoolParameters(entry, name);
+}
+
+inline void
+parse_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPool();
+}
+
+inline void
+parse_response_delay_pool_access(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPoolAccess();
+}
+
+#endif
+#endif
+

=== modified file 'src/SquidConfig.h'
--- src/SquidConfig.h	2017-01-01 00:12:22 +0000
+++ src/SquidConfig.h	2017-01-30 13:45:55 +0000
@@ -1,49 +1,54 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_SQUIDCONFIG_H_
 #define SQUID_SQUIDCONFIG_H_
 
 #include "acl/forward.h"
 #include "base/RefCount.h"
 #include "base/YesNoNone.h"
+#if USE_DELAY_POOLS
 #include "ClientDelayConfig.h"
 #include "DelayConfig.h"
+#endif
 #include "helper/ChildConfig.h"
 #include "HttpHeaderTools.h"
 #include "ip/Address.h"
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+#endif
 #include "Notes.h"
 #include "security/forward.h"
 #include "SquidTime.h"
 #if USE_OPENSSL
 #include "ssl/support.h"
 #endif
 #include "store/forward.h"
 
 #if USE_OPENSSL
 class sslproxy_cert_sign;
 class sslproxy_cert_adapt;
 #endif
 
 namespace Mgr
 {
 class ActionPasswordList;
 } // namespace Mgr
 class CachePeer;
 class CustomLog;
 class CpuAffinityMap;
 class external_acl;
 class HeaderManglers;
 class RefreshPattern;
 class RemovalPolicySettings;
 
 namespace AnyP
 {
 class PortCfg;
 }
 
@@ -407,60 +412,61 @@
         int eprt;
         int sanitycheck;
         int telnet;
     } Ftp;
     RefreshPattern *Refresh;
 
     Store::DiskConfig cacheSwap;
 
     struct {
         char *directory;
         int use_short_names;
     } icons;
     char *errorDirectory;
 #if USE_ERR_LOCALES
     char *errorDefaultLanguage;
     int errorLogMissingLanguages;
 #endif
     char *errorStylesheet;
 
     struct {
         int onerror;
     } retry;
 
     struct {
         int64_t limit;
     } MemPools;
 #if USE_DELAY_POOLS
 
     DelayConfig Delay;
     ClientDelayConfig ClientDelay;
+    MessageDelayConfig MessageDelay;
 #endif
 
     struct {
         struct {
             int average;
             int min_poll;
         } dns, udp, tcp;
     } comm_incoming;
     int max_open_disk_fds;
     int uri_whitespace;
     AclSizeLimit *rangeOffsetLimit;
 #if MULTICAST_MISS_STREAM
 
     struct {
 
         Ip::Address addr;
         int ttl;
         unsigned short port;
         char *encode_key;
     } mcast_miss;
 #endif
 
     /// request_header_access and request_header_replace
     HeaderManglers *request_header_access;
     /// reply_header_access and reply_header_replace
     HeaderManglers *reply_header_access;
     ///request_header_add access list
     HeaderWithAclList *request_header_add;
     ///reply_header_add access list
     HeaderWithAclList *reply_header_add;

=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc	2017-01-01 00:12:22 +0000
+++ src/cache_cf.cc	2017-01-30 13:46:33 +0000
@@ -15,60 +15,61 @@
 #include "acl/Address.h"
 #include "acl/Gadgets.h"
 #include "acl/MethodData.h"
 #include "acl/Tree.h"
 #include "anyp/PortCfg.h"
 #include "anyp/UriScheme.h"
 #include "auth/Config.h"
 #include "auth/Scheme.h"
 #include "AuthReg.h"
 #include "base/RunnersRegistry.h"
 #include "cache_cf.h"
 #include "CachePeer.h"
 #include "ConfigParser.h"
 #include "CpuAffinityMap.h"
 #include "DiskIO/DiskIOModule.h"
 #include "eui/Config.h"
 #include "ExternalACL.h"
 #include "format/Format.h"
 #include "ftp/Elements.h"
 #include "globals.h"
 #include "HttpHeaderTools.h"
 #include "icmp/IcmpConfig.h"
 #include "ident/Config.h"
 #include "ip/Intercept.h"
 #include "ip/QosConfig.h"
 #include "ip/tools.h"
 #include "ipc/Kids.h"
 #include "log/Config.h"
 #include "log/CustomLog.h"
 #include "MemBuf.h"
+#include "MessageDelayPools.h"
 #include "mgr/ActionPasswordList.h"
 #include "mgr/Registration.h"
 #include "neighbors.h"
 #include "NeighborTypeDomainList.h"
 #include "Parsing.h"
 #include "pconn.h"
 #include "PeerDigest.h"
 #include "PeerPoolMgr.h"
 #include "redirect.h"
 #include "RefreshPattern.h"
 #include "rfc1738.h"
 #include "sbuf/List.h"
 #include "SquidConfig.h"
 #include "SquidString.h"
 #include "ssl/ProxyCerts.h"
 #include "Store.h"
 #include "store/Disk.h"
 #include "store/Disks.h"
 #include "StoreFileSystem.h"
 #include "tools.h"
 #include "util.h"
 #include "wordlist.h"
 /* wccp2 has its own conditional definitions */
 #include "wccp2.h"
 #if USE_ADAPTATION
 #include "adaptation/Config.h"
 #endif
 #if ICAP_CLIENT
 #include "adaptation/icap/Config.h"
 #endif
@@ -1627,61 +1628,61 @@
 }
 
 static void
 parse_delay_pool_rates(DelayConfig * cfg)
 {
     cfg->parsePoolRates();
 }
 
 static void
 parse_delay_pool_access(DelayConfig * cfg)
 {
     cfg->parsePoolAccess(LegacyParser);
 }
 
 #endif
 
 #if USE_DELAY_POOLS
 #include "ClientDelayConfig.h"
 /* do nothing - free_client_delay_pool_count is the magic free function.
  * this is why client_delay_pool_count isn't just marked TYPE: u_short
  */
 
 #define free_client_delay_pool_access(X)
 #define free_client_delay_pool_rates(X)
 #define dump_client_delay_pool_access(X, Y, Z)
 #define dump_client_delay_pool_rates(X, Y, Z)
 
 static void
 free_client_delay_pool_count(ClientDelayConfig * cfg)
 {
-    cfg->freePoolCount();
+    cfg->freePools();
 }
 
 static void
 dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
 {
     cfg.dumpPoolCount (entry, name);
 }
 
 static void
 parse_client_delay_pool_count(ClientDelayConfig * cfg)
 {
     cfg->parsePoolCount();
 }
 
 static void
 parse_client_delay_pool_rates(ClientDelayConfig * cfg)
 {
     cfg->parsePoolRates();
 }
 
 static void
 parse_client_delay_pool_access(ClientDelayConfig * cfg)
 {
     cfg->parsePoolAccess(LegacyParser);
 }
 #endif
 
 #if USE_HTTP_VIOLATIONS
 static void
 dump_http_header_access(StoreEntry * entry, const char *name, const HeaderManglers *manglers)

=== modified file 'src/cf.data.depend'
--- src/cf.data.depend	2017-01-01 00:12:22 +0000
+++ src/cf.data.depend	2017-01-22 08:42:37 +0000
@@ -6,60 +6,62 @@
 ##
 #
 # type			dependencies
 #
 access_log		acl	logformat
 acl			external_acl_type auth_param
 acl_access		acl
 acl_address		acl
 acl_b_size_t		acl
 acl_tos			acl
 acl_nfmark		acl
 address
 authparam
 AuthSchemes		acl auth_param
 b_int64_t
 b_size_t
 b_ssize_t
 cachedir		cache_replacement_policy
 cachemgrpasswd
 ConfigAclTos
 configuration_includes_quoted_values
 CpuAffinityMap
 debug
 delay_pool_access	acl	delay_class
 delay_pool_class	delay_pools
 delay_pool_count
 delay_pool_rates	delay_class
 client_delay_pool_access	acl
 client_delay_pool_count
 client_delay_pool_rates
+response_delay_pool_access	acl
+response_delay_pool_parameters
 denyinfo		acl
 eol
 externalAclHelper	auth_param
 HelperChildConfig
 hostdomain		cache_peer
 hostdomaintype		cache_peer
 http_header_access	acl
 http_header_replace
 HeaderWithAclList	acl
 adaptation_access_type	adaptation_service_set adaptation_service_chain acl icap_service icap_class
 adaptation_service_set_type	icap_service ecap_service
 adaptation_service_chain_type	icap_service ecap_service
 icap_access_type	icap_class acl
 icap_class_type		icap_service
 icap_service_type
 icap_service_failure_limit
 icmp
 ecap_service_type
 int
 int64_t
 kb_int64_t
 kb_size_t
 logformat
 YesNoNone
 memcachemode
 note			acl
 obsolete
 onoff
 on_unsupported_protocol	acl
 peer

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre	2017-01-30 18:10:15 +0000
+++ src/cf.data.pre	2017-01-30 22:18:05 +0000
@@ -7158,60 +7158,118 @@
 	request:
 
 	    client_delay_access pool_ID allow|deny acl_name
 
 	All client_delay_access options are checked in their pool ID
 	order, starting with pool 1. The first checked pool with allowed
 	request is selected for the request. If no ACL matches or there
 	are no client_delay_access options, the request bandwidth is not
 	limited.
 
 	The ACL-selected pool is then used to find the
 	client_delay_parameters for the request. Client-side pools are
 	not used to aggregate clients. Clients are always aggregated
 	based on their source IP addresses (one bucket per source IP).
 
 	This clause only supports fast acl types.
 	See http://wiki.squid-cache.org/SquidFaq/SquidAcl for details.
 	Additionally, only the client TCP connection details are available.
 	ACLs testing HTTP properties will not work.
 
 	Please see delay_access for more examples.
 
 	Example:
 		client_delay_access 1 allow low_rate_network
 		client_delay_access 2 allow vips_network
 
 
 	See also client_delay_parameters and client_delay_pools.
 DOC_END
 
+NAME: response_delay_pool
+TYPE: response_delay_pool_parameters
+DEFAULT: none
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+	This option configures client response bandwidth limits using the
+	following format:
+
+	response_delay_pool name [option=value] ...
+
+	name	the response delay pool name
+
+	available options:
+
+		individual-restore	The speed limit of an individual
+					bucket(bytes/s). To be used in conjunction
+					with 'individual-maximum'.
+
+		individual-maximum	The maximum number of bytes which can
+					be placed into the individual bucket. To be used
+					in conjunction with 'individual-restore'.
+
+		aggregate-restore	The speed limit for the aggregate
+					bucket(bytes/s). To be used in conjunction with
+					'aggregate-maximum'.
+
+		aggregate-maximum	The maximum number of bytes which can
+	   				be placed into the aggregate bucket. To be used
+					in conjunction with 'aggregate-restore'.
+
+		initial-bucket-level	The initial bucket size as a percentage
+					of individual-maximum.
+
+	Individual and(or) aggregate bucket options may not be specified,
+   	meaning no individual and(or) aggregate speed limitation.
+	See also response_delay_pool_access and delay_parameters for
+	terminology details.
+DOC_END
+
+NAME: response_delay_pool_access
+TYPE: response_delay_pool_access
+DEFAULT: none
+DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conf for the pool.
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+	Determines whether a specific named response delay pool is used
+	for the transaction. The syntax for this directive is:
+
+	response_delay_pool_access pool_name allow|deny acl_name
+
+	All response_delay_pool_access options are checked in the order
+	they appear in this configuration file. The first rule with a
+	matching ACL wins. If (and only if) an "allow" rule won, Squid
+	assigns the response to the corresponding named delay pool.
+DOC_END
+
 COMMENT_START
  WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
  -----------------------------------------------------------------------------
 COMMENT_END
 
 NAME: wccp_router
 TYPE: address
 LOC: Config.Wccp.router
 DEFAULT: any_addr
 DEFAULT_DOC: WCCP disabled.
 IFDEF: USE_WCCP
 DOC_START
 	Use this option to define your WCCP ``home'' router for
 	Squid.
 
 	wccp_router supports a single WCCP(v1) router
 
 	wccp2_router supports multiple WCCPv2 routers
 
 	only one of the two may be used at the same time and defines
 	which version of WCCP to use.
 DOC_END
 
 NAME: wccp2_router
 TYPE: IpAddress_list
 LOC: Config.Wccp2.router
 DEFAULT: none
 DEFAULT_DOC: WCCPv2 disabled.
 IFDEF: USE_WCCPv2
 DOC_START

=== modified file 'src/client_db.cc'
--- src/client_db.cc	2017-01-01 00:12:22 +0000
+++ src/client_db.cc	2017-01-30 13:46:34 +0000
@@ -26,94 +26,85 @@
 #include "tools.h"
 
 #if SQUID_SNMP
 #include "snmp_core.h"
 #endif
 
 static hash_table *client_table = NULL;
 
 static ClientInfo *clientdbAdd(const Ip::Address &addr);
 static FREE clientdbFreeItem;
 static void clientdbStartGC(void);
 static void clientdbScheduledGC(void *);
 
 #if USE_DELAY_POOLS
 static int max_clients = 32768;
 #else
 static int max_clients = 32;
 #endif
 
 static int cleanup_running = 0;
 static int cleanup_scheduled = 0;
 static int cleanup_removed;
 
 #if USE_DELAY_POOLS
 #define CLIENT_DB_HASH_SIZE 65357
 #else
 #define CLIENT_DB_HASH_SIZE 467
 #endif
 
 ClientInfo::ClientInfo(const Ip::Address &ip) :
+#if USE_DELAY_POOLS
+    BandwidthBucket(0, 0, 0),
+#endif
     addr(ip),
     n_established(0),
     last_seen(0)
 #if USE_DELAY_POOLS
-    , writeSpeedLimit(0),
-    prevTime(0),
-    bucketSize(0),
-    bucketSizeLimit(0),
-    writeLimitingActive(false),
+    , writeLimitingActive(false),
     firstTimeConnection(true),
     quotaQueue(nullptr),
     rationedQuota(0),
     rationedCount(0),
-    selectWaiting(false),
     eventWaiting(false)
 #endif
 {
     debugs(77, 9, "ClientInfo constructed, this=" << static_cast<void*>(this));
-
-#if USE_DELAY_POOLS
-    getCurrentTime();
-    /* put current time to have something sensible here */
-    prevTime = current_dtime;
-#endif
-
     char *buf = static_cast<char*>(xmalloc(MAX_IPSTRLEN)); // becomes hash.key
-    hash.key = addr.toStr(buf,MAX_IPSTRLEN);
+    key = addr.toStr(buf,MAX_IPSTRLEN);
 }
 
 static ClientInfo *
 clientdbAdd(const Ip::Address &addr)
 {
     ClientInfo *c = new ClientInfo(addr);
-    hash_join(client_table, &c->hash);
+    hash_join(client_table, static_cast<hash_link*>(c));
     ++statCounter.client_http.clients;
 
     if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
         ++cleanup_scheduled;
         eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 90, 0);
     }
 
     return c;
 }
 
 static void
 clientdbInit(void)
 {
     if (client_table)
         return;
 
     client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
 }
 
 class ClientDbRr: public RegisteredRunner
 {
 public:
     /* RegisteredRunner API */
     virtual void useConfig();
 };
 RunnerRegistrationEntry(ClientDbRr);
 
 void
 ClientDbRr::useConfig()
 {
@@ -250,245 +241,237 @@
         NR = 150;
 
     ND = c->Icp.result_hist[LOG_UDP_DENIED] - c->cutoff.n_denied;
 
     p = 100.0 * ND / NR;
 
     if (p < 95.0)
         return 0;
 
     debugs(1, DBG_CRITICAL, "WARNING: Probable misconfigured neighbor at " << key);
 
     debugs(1, DBG_CRITICAL, "WARNING: " << ND << " of the last " << NR <<
            " ICP replies are DENIED");
 
     debugs(1, DBG_CRITICAL, "WARNING: No replies will be sent for the next " <<
            CUTOFF_SECONDS << " seconds");
 
     c->cutoff.time = squid_curtime;
 
     c->cutoff.n_req = c->Icp.n_requests;
 
     c->cutoff.n_denied = c->Icp.result_hist[LOG_UDP_DENIED];
 
     return 1;
 }
 
 void
 clientdbDump(StoreEntry * sentry)
 {
     const char *name;
-    ClientInfo *c;
     int icp_total = 0;
     int icp_hits = 0;
     int http_total = 0;
     int http_hits = 0;
     storeAppendPrintf(sentry, "Cache Clients:\n");
     hash_first(client_table);
 
-    while ((c = (ClientInfo *) hash_next(client_table))) {
-        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash));
+    while (hash_link *hash = hash_next(client_table)) {
+        const ClientInfo *c = reinterpret_cast<const ClientInfo *>(hash);
+        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash));
         if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) {
             storeAppendPrintf(sentry, "Name:    %s\n", name);
         }
         storeAppendPrintf(sentry, "Currently established connections: %d\n",
                           c->n_established);
         storeAppendPrintf(sentry, "    ICP  Requests %d\n",
                           c->Icp.n_requests);
 
         for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
             if (c->Icp.result_hist[l] == 0)
                 continue;
 
             icp_total += c->Icp.result_hist[l];
 
             if (LOG_UDP_HIT == l)
                 icp_hits += c->Icp.result_hist[l];
 
             storeAppendPrintf(sentry, "        %-20.20s %7d %3d%%\n", LogTags(l).c_str(), c->Icp.result_hist[l], Math::intPercent(c->Icp.result_hist[l], c->Icp.n_requests));
         }
 
         storeAppendPrintf(sentry, "    HTTP Requests %d\n", c->Http.n_requests);
 
         for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
             if (c->Http.result_hist[l] == 0)
                 continue;
 
             http_total += c->Http.result_hist[l];
 
             if (LogTags(l).isTcpHit())
                 http_hits += c->Http.result_hist[l];
 
             storeAppendPrintf(sentry,
                               "        %-20.20s %7d %3d%%\n",
                               LogTags(l).c_str(),
                               c->Http.result_hist[l],
                               Math::intPercent(c->Http.result_hist[l], c->Http.n_requests));
         }
 
         storeAppendPrintf(sentry, "\n");
     }
 
     storeAppendPrintf(sentry, "TOTALS\n");
     storeAppendPrintf(sentry, "ICP : %d Queries, %d Hits (%3d%%)\n",
                       icp_total, icp_hits, Math::intPercent(icp_hits, icp_total));
     storeAppendPrintf(sentry, "HTTP: %d Requests, %d Hits (%3d%%)\n",
                       http_total, http_hits, Math::intPercent(http_hits, http_total));
 }
 
 static void
 clientdbFreeItem(void *data)
 {
     ClientInfo *c = (ClientInfo *)data;
     delete c;
 }
 
 ClientInfo::~ClientInfo()
 {
-    safe_free(hash.key);
+    safe_free(key);
 
 #if USE_DELAY_POOLS
     if (CommQuotaQueue *q = quotaQueue) {
         q->clientInfo = NULL;
         delete q; // invalidates cbdata, cancelling any pending kicks
     }
 #endif
 
     debugs(77, 9, "ClientInfo destructed, this=" << static_cast<void*>(this));
 }
 
 void
 clientdbFreeMemory(void)
 {
     hashFreeItems(client_table, clientdbFreeItem);
     hashFreeMemory(client_table);
     client_table = NULL;
 }
 
 static void
 clientdbScheduledGC(void *)
 {
     cleanup_scheduled = 0;
     clientdbStartGC();
 }
 
 static void
 clientdbGC(void *)
 {
     static int bucket = 0;
     hash_link *link_next;
 
     link_next = hash_get_bucket(client_table, bucket++);
 
     while (link_next != NULL) {
         ClientInfo *c = (ClientInfo *)link_next;
         int age = squid_curtime - c->last_seen;
         link_next = link_next->next;
 
         if (c->n_established)
             continue;
 
         if (age < 24 * 3600 && c->Http.n_requests > 100)
             continue;
 
         if (age < 4 * 3600 && (c->Http.n_requests > 10 || c->Icp.n_requests > 10))
             continue;
 
         if (age < 5 * 60 && (c->Http.n_requests > 1 || c->Icp.n_requests > 1))
             continue;
 
         if (age < 60)
             continue;
 
-        hash_remove_link(client_table, &c->hash);
+        hash_remove_link(client_table, static_cast<hash_link*>(c));
 
         clientdbFreeItem(c);
 
         --statCounter.client_http.clients;
 
         ++cleanup_removed;
     }
 
     if (bucket < CLIENT_DB_HASH_SIZE)
         eventAdd("client_db garbage collector", clientdbGC, NULL, 0.15, 0);
     else {
         bucket = 0;
         cleanup_running = 0;
         max_clients = statCounter.client_http.clients * 3 / 2;
 
         if (!cleanup_scheduled) {
             cleanup_scheduled = 1;
             eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 6 * 3600, 0);
         }
 
         debugs(49, 2, "clientdbGC: Removed " << cleanup_removed << " entries");
     }
 }
 
 static void
 clientdbStartGC(void)
 {
     max_clients = statCounter.client_http.clients;
     cleanup_running = 1;
     cleanup_removed = 0;
     clientdbGC(NULL);
 }
 
 #if SQUID_SNMP
 
 Ip::Address *
 client_entry(Ip::Address *current)
 {
-    ClientInfo *c = NULL;
     char key[MAX_IPSTRLEN];
+    hash_first(client_table);
 
     if (current) {
         current->toStr(key,MAX_IPSTRLEN);
-        hash_first(client_table);
-        while ((c = (ClientInfo *) hash_next(client_table))) {
-            if (!strcmp(key, hashKeyStr(&c->hash)))
+        while (hash_link *hash = hash_next(client_table)) {
+            if (!strcmp(key, hashKeyStr(hash)))
                 break;
         }
-
-        c = (ClientInfo *) hash_next(client_table);
-    } else {
-        hash_first(client_table);
-        c = (ClientInfo *) hash_next(client_table);
     }
 
+    ClientInfo *c = reinterpret_cast<ClientInfo *>(hash_next(client_table));
+
     hash_last(client_table);
 
-    if (c)
-        return (&c->addr);
-    else
-        return (NULL);
-
+    return c ? &c->addr : nullptr;
 }
 
 variable_list *
 snmp_meshCtblFn(variable_list * Var, snint * ErrP)
 {
     char key[MAX_IPSTRLEN];
     ClientInfo *c = NULL;
     Ip::Address keyIp;
 
     *ErrP = SNMP_ERR_NOERROR;
     MemBuf tmp;
     debugs(49, 6, HERE << "Current : length=" << Var->name_length << ": " << snmpDebugOid(Var->name, Var->name_length, tmp));
     if (Var->name_length == 16) {
         oid2addr(&(Var->name[12]), keyIp, 4);
     } else if (Var->name_length == 28) {
         oid2addr(&(Var->name[12]), keyIp, 16);
     } else {
         *ErrP = SNMP_ERR_NOSUCHNAME;
         return NULL;
     }
 
     keyIp.toStr(key, sizeof(key));
     debugs(49, 5, HERE << "[" << key << "] requested!");
     c = (ClientInfo *) hash_lookup(client_table, key);
 
     if (c == NULL) {
         debugs(49, 5, HERE << "not found.");
         *ErrP = SNMP_ERR_NOSUCHNAME;
         return NULL;
     }

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2017-01-30 12:46:15 +0000
+++ src/client_side.cc	2017-01-30 22:18:05 +0000
@@ -93,60 +93,61 @@
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "internal.h"
 #include "ipc/FdNotes.h"
 #include "ipc/StartListening.h"
 #include "log/access_log.h"
 #include "MemBuf.h"
 #include "MemObject.h"
 #include "mime_header.h"
 #include "parser/Tokenizer.h"
 #include "profiler/Profiler.h"
 #include "rfc1738.h"
 #include "security/NegotiationHistory.h"
 #include "servers/forward.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #include "StatHist.h"
 #include "Store.h"
 #include "TimeOrTag.h"
 #include "tools.h"
 #include "URL.h"
 
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
+#include "MessageDelayPools.h"
 #endif
 #if USE_OPENSSL
 #include "ssl/bio.h"
 #include "ssl/context_storage.h"
 #include "ssl/gadgets.h"
 #include "ssl/helper.h"
 #include "ssl/ProxyCerts.h"
 #include "ssl/ServerBump.h"
 #include "ssl/support.h"
 #endif
 
 // for tvSubUsec() which should be in SquidTime.h
 #include "util.h"
 
 #include <climits>
 #include <cmath>
 #include <limits>
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
 #endif
 
 /// dials clientListenerConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
     typedef void (*Handler)(AnyP::PortCfgPointer &portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
     ListeningStartedDialer(Handler aHandler, AnyP::PortCfgPointer &aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
         handler(aHandler), portCfg(aPortCfg), portTypeNote(note), sub(aSub) {}
 
@@ -2461,89 +2462,89 @@
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, ConnStateData::connStateClosed);
     comm_add_close_handler(clientConnection->fd, call);
 
     if (Config.onoff.log_fqdn)
         fqdncache_gethostbyaddr(clientConnection->remote, FQDN_LOOKUP_IF_MISS);
 
 #if USE_IDENT
     if (Ident::TheConfig.identLookup) {
         ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL);
         identChecklist.src_addr = clientConnection->remote;
         identChecklist.my_addr = clientConnection->local;
         if (identChecklist.fastCheck() == ACCESS_ALLOWED)
             Ident::Start(clientConnection, clientIdentDone, this);
     }
 #endif
 
     clientdbEstablished(clientConnection->remote, 1);
 
     needProxyProtocolHeader_ = port->flags.proxySurrogate;
     if (needProxyProtocolHeader_) {
         if (!proxyProtocolValidateClient()) // will close the connection on failure
             return;
     }
 
 #if USE_DELAY_POOLS
     fd_table[clientConnection->fd].clientInfo = NULL;
 
     if (Config.onoff.client_db) {
         /* it was said several times that client write limiter does not work if client_db is disabled */
 
-        ClientDelayPools& pools(Config.ClientDelay.pools);
+        auto &pools = ClientDelayPools::Instance()->pools;
         ACLFilledChecklist ch(NULL, NULL, NULL);
 
         // TODO: we check early to limit error response bandwith but we
         // should recheck when we can honor delay_pool_uses_indirect
         // TODO: we should also pass the port details for myportname here.
         ch.src_addr = clientConnection->remote;
         ch.my_addr = clientConnection->local;
 
         for (unsigned int pool = 0; pool < pools.size(); ++pool) {
 
             /* pools require explicit 'allow' to assign a client into them */
-            if (pools[pool].access) {
-                ch.changeAcl(pools[pool].access);
+            if (pools[pool]->access) {
+                ch.changeAcl(pools[pool]->access);
                 allow_t answer = ch.fastCheck();
                 if (answer == ACCESS_ALLOWED) {
 
                     /*  request client information from db after we did all checks
                         this will save hash lookup if client failed checks */
                     ClientInfo * cli = clientdbGetInfo(clientConnection->remote);
                     assert(cli);
 
                     /* put client info in FDE */
                     fd_table[clientConnection->fd].clientInfo = cli;
 
                     /* setup write limiter for this request */
                     const double burst = floor(0.5 +
-                                               (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
-                    cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+                                               (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0);
+                    cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark);
                     break;
                 } else {
                     debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer);
                 }
             }
         }
     }
 #endif
 
     // kids must extend to actually start doing something (e.g., reading)
 }
 
 /** Handle a new connection on an HTTP socket. */
 void
 httpAccept(const CommAcceptCbParams &params)
 {
     MasterXaction::Pointer xact = params.xaction;
     AnyP::PortCfgPointer s = xact->squidPort;
 
     // NP: it is possible the port was reconfigured when the call or accept() was queued.
 
     if (params.flag != Comm::OK) {
         // Its possible the call was still queued when the client disconnected
         debugs(33, 2, s->listenConn << ": accept failure: " << xstrerr(params.xerrno));
         return;
     }
 
     debugs(33, 4, params.conn << ": accepted");
     fd_note(params.conn->fd, "client http connect");
 

=== modified file 'src/client_side.h'
--- src/client_side.h	2017-01-30 12:46:15 +0000
+++ src/client_side.h	2017-01-30 22:18:05 +0000
@@ -1,59 +1,62 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 33    Client-side Routines */
 
 #ifndef SQUID_CLIENTSIDE_H
 #define SQUID_CLIENTSIDE_H
 
 #include "base/RunnersRegistry.h"
 #include "clientStreamForward.h"
 #include "comm.h"
 #include "helper/forward.h"
 #include "http/forward.h"
 #include "HttpControlMsg.h"
 #include "ipc/FdNotes.h"
 #include "sbuf/SBuf.h"
 #include "servers/Server.h"
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_OPENSSL
 #include "security/Handshake.h"
 #include "ssl/support.h"
 #endif
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class ClientHttpRequest;
 class HttpHdrRangeSpec;
 
 #if USE_OPENSSL
 namespace Ssl
 {
 class ServerBump;
 }
 #endif
 
 /**
  * Legacy Server code managing a connection to a client.
  *
  * NP: presents AsyncJob API but does not operate autonomously as a Job.
  *     So Must() is not safe to use.
  *
  * Multiple requests (up to pipeline_prefetch) can be pipelined.
  * This object is responsible for managing which one is currently being
  * fulfilled and what happens to the queue if the current one causes the client
  * connection to be closed early.
  *
  * Act as a manager for the client connection and passes data in buffer to a
  * Parser relevant to the state (message headers vs body) that is being
  * processed.
  *
  * Performs HTTP message processing to kick off the actual HTTP request
  * handling objects (Http::Stream, ClientHttpRequest, HttpRequest).
  *
  * Performs SSL-Bump processing for switching between HTTP and HTTPS protocols.

=== modified file 'src/comm.cc'
--- src/comm.cc	2017-01-01 00:12:22 +0000
+++ src/comm.cc	2017-01-30 13:46:37 +0000
@@ -889,67 +889,63 @@
     PROF_start(comm_close);
 
     F->flags.close_request = true;
 
 #if USE_OPENSSL
     if (F->ssl) {
         AsyncCall::Pointer startCall=commCbCall(5,4, "commStartSslClose",
                                                 FdeCbPtrFun(commStartSslClose, NULL));
         FdeCbParams &startParams = GetCommParams<FdeCbParams>(startCall);
         startParams.fd = fd;
         ScheduleCallHere(startCall);
     }
 #endif
 
     // a half-closed fd may lack a reader, so we stop monitoring explicitly
     if (commHasHalfClosedMonitor(fd))
         commStopHalfClosedMonitor(fd);
     commUnsetFdTimeout(fd);
 
     // notify read/write handlers after canceling select reservations, if any
     if (COMMIO_FD_WRITECB(fd)->active()) {
         Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
         COMMIO_FD_WRITECB(fd)->finish(Comm::ERR_CLOSING, errno);
     }
     if (COMMIO_FD_READCB(fd)->active()) {
         Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
         COMMIO_FD_READCB(fd)->finish(Comm::ERR_CLOSING, errno);
     }
 
 #if USE_DELAY_POOLS
-    if (ClientInfo *clientInfo = F->clientInfo) {
-        if (clientInfo->selectWaiting) {
-            clientInfo->selectWaiting = false;
-            // kick queue or it will get stuck as commWriteHandle is not called
-            clientInfo->kickQuotaQueue();
-        }
-    }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F))
+        if (bucket->selectWaiting)
+            bucket->onFdClosed();
 #endif
 
     commCallCloseHandlers(fd);
 
     comm_empty_os_read_buffers(fd);
 
     AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
                                     FdeCbPtrFun(comm_close_complete, NULL));
     FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall);
     completeParams.fd = fd;
     // must use async call to wait for all callbacks
     // scheduled before comm_close() to finish
     ScheduleCallHere(completeCall);
 
     PROF_stop(comm_close);
 }
 
 /* Send a udp datagram to specified TO_ADDR. */
 int
 comm_udp_sendto(int fd,
                 const Ip::Address &to_addr,
                 const void *buf,
                 int len)
 {
     PROF_start(comm_udp_sendto);
     ++ statCounter.syscalls.sock.sendtos;
 
     debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
            " using FD " << fd << " using Port " << comm_local_port(fd) );
 
@@ -1324,195 +1320,197 @@
 
 /// returns the reservation ID of the first descriptor to be dequeued
 unsigned int
 ClientInfo::quotaPeekReserv() const
 {
     assert(quotaQueue);
     return quotaQueue->outs + 1;
 }
 
 /// queues a given fd, creating the queue if necessary; returns reservation ID
 unsigned int
 ClientInfo::quotaEnqueue(int fd)
 {
     assert(quotaQueue);
     return quotaQueue->enqueue(fd);
 }
 
 /// removes queue head
 void
 ClientInfo::quotaDequeue()
 {
     assert(quotaQueue);
     quotaQueue->dequeue();
 }
 
 void
 ClientInfo::kickQuotaQueue()
 {
     if (!eventWaiting && !selectWaiting && hasQueue()) {
         // wait at least a second if the bucket is empty
-        const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+        const double delay = (bucketLevel < 1.0) ? 1.0 : 0.0;
         eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
                  quotaQueue, delay, 0, true);
         eventWaiting = true;
     }
 }
 
 /// calculates how much to write for a single dequeued client
 int
-ClientInfo::quotaForDequed()
+ClientInfo::quota()
 {
     /* If we have multiple clients and give full bucketSize to each client then
      * clt1 may often get a lot more because clt1->clt2 time distance in the
      * select(2) callback order may be a lot smaller than cltN->clt1 distance.
      * We divide quota evenly to be more fair. */
 
     if (!rationedCount) {
         rationedCount = quotaQueue->size() + 1;
 
         // The delay in ration recalculation _temporary_ deprives clients from
         // bytes that should have trickled in while rationedCount was positive.
         refillBucket();
 
         // Rounding errors do not accumulate here, but we round down to avoid
         // negative bucket sizes after write with rationedCount=1.
-        rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+        rationedQuota = static_cast<int>(floor(bucketLevel/rationedCount));
         debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
                '*' << rationedCount);
     }
 
     --rationedCount;
     debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
            " rations remaining: " << rationedCount);
 
     // update 'last seen' time to prevent clientdb GC from dropping us
     last_seen = squid_curtime;
     return rationedQuota;
 }
 
-///< adds bytes to the quota bucket based on the rate and passed time
-void
-ClientInfo::refillBucket()
-{
-    // all these times are in seconds, with double precision
-    const double currTime = current_dtime;
-    const double timePassed = currTime - prevTime;
-
-    // Calculate allowance for the time passed. Use double to avoid
-    // accumulating rounding errors for small intervals. For example, always
-    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
-    const double gain = timePassed * writeSpeedLimit;
-
-    debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
-           bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
-           " = " << gain << ')');
-
-    // to further combat error accumulation during micro updates,
-    // quit before updating time if we cannot add at least one byte
-    if (gain < 1.0)
-        return;
-
-    prevTime = currTime;
-
-    // for "first" connections, drain initial fat before refilling but keep
-    // updating prevTime to avoid bursts after the fat is gone
-    if (bucketSize > bucketSizeLimit) {
-        debugs(77,4, HERE << "not refilling while draining initial fat");
-        return;
-    }
-
-    bucketSize += gain;
-
-    // obey quota limits
-    if (bucketSize > bucketSizeLimit)
-        bucketSize = bucketSizeLimit;
+bool
+ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    assert(hasQueue());
+    assert(quotaPeekFd() == state->conn->fd);
+    quotaDequeue(); // we will write or requeue below
+    if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
+        return false;
+    }
+    return true;
+}
+
+void
+ClientInfo::scheduleWrite(Comm::IoCallback *state)
+{
+    if (writeLimitingActive) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
+    }
+}
+
+void
+ClientInfo::onFdClosed()
+{
+    BandwidthBucket::onFdClosed();
+    // kick queue or it will get stuck as commWriteHandle is not called
+    kickQuotaQueue();
+}
+
+void
+ClientInfo::reduceBucket(const int len)
+{
+    if (len > 0)
+        BandwidthBucket::reduceBucket(len);
+    // even if we wrote nothing, we were served; give others a chance
+    kickQuotaQueue();
 }
 
 void
 ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
 {
-    debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+    debugs(77,5, "Write limits for " << (const char*)key <<
            " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
            " highwatermark=" << aHighWatermark);
 
     // set or possibly update traffic shaping parameters
     writeLimitingActive = true;
     writeSpeedLimit = aWriteSpeedLimit;
     bucketSizeLimit = aHighWatermark;
 
     // but some members should only be set once for a newly activated bucket
     if (firstTimeConnection) {
         firstTimeConnection = false;
 
         assert(!selectWaiting);
         assert(!quotaQueue);
         quotaQueue = new CommQuotaQueue(this);
 
-        bucketSize = anInitialBurst;
+        bucketLevel = anInitialBurst;
         prevTime = current_dtime;
     }
 }
 
 CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
     ins(0), outs(0)
 {
     assert(clientInfo);
 }
 
 CommQuotaQueue::~CommQuotaQueue()
 {
     assert(!clientInfo); // ClientInfo should clear this before destroying us
 }
 
 /// places the given fd at the end of the queue; returns reservation ID
 unsigned int
 CommQuotaQueue::enqueue(int fd)
 {
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
     fds.push_back(fd);
     return ++ins;
 }
 
 /// removes queue head
 void
 CommQuotaQueue::dequeue()
 {
     assert(!fds.empty());
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
            fds.size());
     fds.pop_front();
     ++outs;
 }
-#endif
+#endif /* USE_DELAY_POOLS */
 
 /*
  * hm, this might be too general-purpose for all the places we'd
  * like to use it.
  */
 int
 ignoreErrno(int ierrno)
 {
     switch (ierrno) {
 
     case EINPROGRESS:
 
     case EWOULDBLOCK:
 #if EAGAIN != EWOULDBLOCK
 
     case EAGAIN:
 #endif
 
     case EALREADY:
 
     case EINTR:
 #ifdef ERESTART
 
     case ERESTART:
 #endif
 
         return 1;
 
     default:
         return 0;
@@ -1566,61 +1564,70 @@
     return false;
 }
 
 static bool
 writeTimedOut(int fd)
 {
     if (!COMMIO_FD_WRITECB(fd)->active())
         return false;
 
     if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
         return false;
 
     return true;
 }
 
 void
 checkTimeouts(void)
 {
     int fd;
     fde *F = NULL;
     AsyncCall::Pointer callback;
 
     for (fd = 0; fd <= Biggest_FD; ++fd) {
         F = &fd_table[fd];
 
         if (writeTimedOut(fd)) {
             // We have an active write callback and we are timed out
             debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
             Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
             COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT);
-        } else if (AlreadyTimedOut(F))
+#if USE_DELAY_POOLS
+        } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) {
+            if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) {
+                F->writeQuotaHandler->selectWaiting = true;
+                Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0);
+            }
+            continue;
+#endif
+        }
+        else if (AlreadyTimedOut(F))
             continue;
 
         debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
 
         if (F->timeoutHandler != NULL) {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
             callback = F->timeoutHandler;
             F->timeoutHandler = NULL;
             ScheduleCallHere(callback);
         } else {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
             comm_close(fd);
         }
     }
 }
 
 /// Start waiting for a possibly half-closed connection to close
 // by scheduling a read callback to a monitoring handler that
 // will close the connection on read errors.
 void
 commStartHalfClosedMonitor(int fd)
 {
     debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed);
     assert(isOpen(fd) && !commHasHalfClosedMonitor(fd));
     (void)TheHalfClosed->add(fd); // could also assert the result
     commPlanHalfClosedCheck(); // may schedule check if we added the first FD
 }
 
 static
 void

=== modified file 'src/comm/IoCallback.cc'
--- src/comm/IoCallback.cc	2017-01-01 00:12:22 +0000
+++ src/comm/IoCallback.cc	2017-01-30 13:46:36 +0000
@@ -42,67 +42,63 @@
 }
 
 /**
  * Configure Comm::Callback for I/O
  *
  * @param fd            filedescriptor
  * @param t             IO callback type (read or write)
  * @param cb            callback
  * @param buf           buffer, if applicable
  * @param func          freefunc, if applicable
  * @param sz            buffer size
  */
 void
 Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
 {
     assert(!active());
     assert(type == t);
     assert(cb != NULL);
 
     callback = cb;
     buf = b;
     freefunc = f;
     size = sz;
     offset = 0;
 }
 
 void
 Comm::IoCallback::selectOrQueueWrite()
 {
 #if USE_DELAY_POOLS
-    // stand in line if there is one
-    if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
-        if (clientInfo->writeLimitingActive) {
-            quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
-            clientInfo->kickQuotaQueue();
-            return;
-        }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) {
+        bucket->scheduleWrite(this);
+        return;
     }
 #endif
 
     SetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
 }
 
 void
 Comm::IoCallback::cancel(const char *reason)
 {
     if (!active())
         return;
 
     callback->cancel(reason);
     callback = NULL;
     reset();
 }
 
 void
 Comm::IoCallback::reset()
 {
     conn = NULL;
     if (freefunc) {
         freefunc(buf);
         buf = NULL;
         freefunc = NULL;
     }
     xerrno = 0;
 
 #if USE_DELAY_POOLS
     quotaQueueReserv = 0;

=== modified file 'src/comm/Loops.h'
--- src/comm/Loops.h	2017-01-01 00:12:22 +0000
+++ src/comm/Loops.h	2017-01-30 13:46:36 +0000
@@ -1,59 +1,56 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_SRC_COMM_LOOPS_H
 #define _SQUID_SRC_COMM_LOOPS_H
 
 #include "comm/Flag.h"
 #include "comm/forward.h"
 
 /* Comm layer select loops API.
  *
  * These API functions must be implemented by all FD IO loops used by Squid.
  * Defines are provided short-term for legacy code. These will disappear soon.
  */
 
 namespace Comm
 {
 
 /// Initialize the module on Squid startup
 void SelectLoopInit(void);
 
-/// Mark an FD to be watched for its IO status.
-void SetSelect(int, unsigned int, PF *, void *, time_t);
-
 /// reset/undo/unregister the watch for an FD which was set by Comm::SetSelect()
 void ResetSelect(int);
 
 /** Perform a select() or equivalent call.
  * This is used by the main select loop engine to check for FD with IO available.
  */
 Comm::Flag DoSelect(int);
 
 void QuickPollRequired(void);
 
 /**
  * Max number of UDP messages to receive per call to the UDP receive poller.
  * This is a per-port limit for ICP/HTCP ports.
  * DNS has a separate limit.
  */
 #if _SQUID_WINDOWS_
 #define INCOMING_UDP_MAX 1
 #else
 #define INCOMING_UDP_MAX 15
 #endif
 
 /**
  * Max number of DNS messages to receive per call to DNS read handler
  */
 #if _SQUID_WINDOWS_
 #define INCOMING_DNS_MAX 1
 #else
 #define INCOMING_DNS_MAX 15
 #endif
 

=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc	2017-01-01 00:12:22 +0000
+++ src/comm/Write.cc	2017-01-30 13:46:37 +0000
@@ -1,152 +1,124 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
+#include "cbdata.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
+#include "comm/Loops.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "MemBuf.h"
 #include "profiler/Profiler.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 
 #include <cerrno>
 
 void
 Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
 {
     Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
 void
 Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(fd_table[conn->fd].flags.open);
     assert(!fd_table[conn->fd].closing());
     Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
     assert(!ccb->active());
 
     fd_table[conn->fd].writeStart = squid_curtime;
     ccb->conn = conn;
     /* Queue the write */
     ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
     ccb->selectOrQueueWrite();
 }
 
 /** Write to FD.
  * This function is used by the lowest level of IO loop which only has access to FD numbers.
  * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
  * Once the write has been concluded we schedule the waiting call with success/fail results.
  */
 void
 Comm::HandleWrite(int fd, void *data)
 {
     Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
     int len = 0;
     int nleft;
 
-    assert(state->conn != NULL && state->conn->fd == fd);
+    assert(state->conn != NULL);
+    assert(state->conn->fd == fd);
 
     PROF_start(commHandleWrite);
     debugs(5, 5, HERE << state->conn << ": off " <<
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
 
 #if USE_DELAY_POOLS
-    ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
-    if (clientInfo && !clientInfo->writeLimitingActive)
-        clientInfo = NULL; // we only care about quota limits here
-
-    if (clientInfo) {
-        assert(clientInfo->selectWaiting);
-        clientInfo->selectWaiting = false;
-
-        assert(clientInfo->hasQueue());
-        assert(clientInfo->quotaPeekFd() == fd);
-        clientInfo->quotaDequeue(); // we will write or requeue below
-
-        if (nleft > 0) {
-            const int quota = clientInfo->quotaForDequed();
-            if (!quota) {  // if no write quota left, queue this fd
-                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-                clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
-            }
-
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << state->conn << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
-            }
-
+    BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]);
+    if (bucket) {
+        assert(bucket->selectWaiting);
+        bucket->selectWaiting = false;
+        if (nleft > 0 && !bucket->applyQuota(nleft, state)) {
+            PROF_stop(commHandleWrite);
+            return;
         }
     }
 #endif /* USE_DELAY_POOLS */
 
     /* actually WRITE data */
     int xerrno = errno = 0;
     len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     xerrno = errno;
     debugs(5, 5, HERE << "write() returns " << len);
 
 #if USE_DELAY_POOLS
-    if (clientInfo) {
-        if (len > 0) {
-            /* we wrote data - drain them from bucket */
-            clientInfo->bucketSize -= len;
-            if (clientInfo->bucketSize < 0.0) {
-                debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
-                clientInfo->bucketSize = 0;
-            }
-        }
-
-        // even if we wrote nothing, we were served; give others a chance
-        clientInfo->kickQuotaQueue();
+    if (bucket) {
+        /* we wrote data - drain them from bucket */
+        bucket->reduceBucket(len);
     }
 #endif /* USE_DELAY_POOLS */
 
     fd_bytes(fd, len, FD_WRITE);
     ++statCounter.syscalls.sock.writes;
     // After each successful partial write,
     // reset fde::writeStart to the current time.
     fd_table[fd].writeStart = squid_curtime;
 
     if (len == 0) {
         /* Note we even call write if nleft == 0 */
         /* We're done */
         if (nleft != 0)
             debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
 
         state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, 0);
     } else if (len < 0) {
         /* An error */
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
         } else if (ignoreErrno(xerrno)) {
             debugs(50, 9, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->selectOrQueueWrite();
         } else {
             debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
         }
     } else {
         /* A successful write, continue */

=== modified file 'src/comm/Write.h'
--- src/comm/Write.h	2017-01-01 00:12:22 +0000
+++ src/comm/Write.h	2017-01-30 13:46:37 +0000
@@ -7,37 +7,34 @@
  */
 
 #ifndef _SQUID_COMM_IOWRITE_H
 #define _SQUID_COMM_IOWRITE_H
 
 #include "base/AsyncCall.h"
 #include "comm/forward.h"
 #include "mem/forward.h"
 
 class MemBuf;
 namespace Comm
 {
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  *
  * free_func is used to free the passed buffer when the write has completed.
  */
 void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  */
 void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
 
 /// Cancel the write pending on FD. No action if none pending.
 void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
 
-// callback handler to process an FD which is available for writing.
-extern PF HandleWrite;
-
 } // namespace Comm
 
 #endif /* _SQUID_COMM_IOWRITE_H */
 

=== modified file 'src/comm/forward.h'
--- src/comm/forward.h	2017-01-01 00:12:22 +0000
+++ src/comm/forward.h	2017-01-30 13:46:37 +0000
@@ -1,36 +1,42 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_COMM_FORWARD_H
 #define _SQUID_COMM_FORWARD_H
 
 #include "base/RefCount.h"
 
 #include <vector>
 
+/// legacy CBDATA callback functions ABI definition for read or write I/O events
+/// \deprecated use CommCalls API instead where possible
+typedef void PF(int, void *);
+
 /// Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets.
 namespace Comm
 {
 
 class Connection;
 class ConnOpener;
 
 typedef RefCount<Comm::Connection> ConnectionPointer;
 
 typedef std::vector<Comm::ConnectionPointer> ConnectionList;
 
 bool IsConnOpen(const Comm::ConnectionPointer &conn);
 
+// callback handler to process an FD which is available for writing.
+PF HandleWrite;
+
+/// Mark an FD to be watched for its IO status.
+void SetSelect(int, unsigned int, PF *, void *, time_t);
+
 }; // namespace Comm
 
-/// legacy CBDATA callback functions ABI definition for read or write I/O events
-/// \deprecated use CommCalls API instead where possible
-typedef void PF(int, void *);
-
 #endif /* _SQUID_COMM_FORWARD_H */
 

=== modified file 'src/fde.h'
--- src/fde.h	2017-01-01 00:12:22 +0000
+++ src/fde.h	2017-01-30 13:46:42 +0000
@@ -1,49 +1,50 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_FDE_H
 #define SQUID_FDE_H
 
 #include "comm.h"
 #include "defines.h"
 #include "ip/Address.h"
 #include "ip/forward.h"
 #include "security/forward.h"
 #include "typedefs.h" //DRCB, DWCB
 
 #if USE_DELAY_POOLS
+#include "MessageBucket.h"
 class ClientInfo;
 #endif
 
 /**
  * READ_HANDLER functions return < 0 if, and only if, they fail with an error.
  * On error, they must pass back an error code in 'errno'.
  */
 typedef int READ_HANDLER(int, char *, int);
 
 /**
  * WRITE_HANDLER functions return < 0 if, and only if, they fail with an error.
  * On error, they must pass back an error code in 'errno'.
  */
 typedef int WRITE_HANDLER(int, const char *, int);
 
 class dwrite_q;
 class _fde_disk
 {
 public:
     DWCB *wrt_handle;
     void *wrt_handle_data;
     dwrite_q *write_q;
     dwrite_q *write_q_tail;
     off_t offset;
     _fde_disk() { memset(this, 0, sizeof(_fde_disk)); }
 };
 
 class fde
 {
 
@@ -76,110 +77,112 @@
                                         See also nfmarkFromServer. */
     int sock_family;
     char ipaddr[MAX_IPSTRLEN];            /* dotted decimal address of peer */
     char desc[FD_DESC_SZ];
 
     struct _fde_flags {
         bool open;
         bool close_request; ///< true if file_ or comm_close has been called
         bool write_daemon;
         bool socket_eof;
         bool nolinger;
         bool nonblocking;
         bool ipc;
         bool called_connect;
         bool nodelay;
         bool close_on_exec;
         bool read_pending;
         //bool write_pending; //XXX seems not to be used
         bool transparent;
     } flags;
 
     int64_t bytes_read;
     int64_t bytes_written;
 
     struct {
         int uses;                   /* ie # req's over persistent conn */
     } pconn;
 
 #if USE_DELAY_POOLS
     ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
 #endif
     unsigned epoll_state;
 
     _fde_disk disk;
     PF *read_handler;
     void *read_data;
     PF *write_handler;
     void *write_data;
     AsyncCall::Pointer timeoutHandler;
     time_t timeout;
     time_t writeStart;
     void *lifetime_data;
     AsyncCall::Pointer closeHandler;
     AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds
     READ_HANDLER *read_method;
     WRITE_HANDLER *write_method;
     Security::SessionPointer ssl;
     Security::ContextPointer dynamicTlsContext; ///< cached and then freed when fd is closed
 #if _SQUID_WINDOWS_
     struct {
         long handle;
     } win32;
 #endif
     tos_t tosFromServer;                /**< Stores the TOS flags of the packets from the remote server.
                                             See FwdState::dispatch(). Note that this differs to
                                             tosToServer in that this is the value we *receive* from the,
                                             connection, whereas tosToServer is the value to set on packets
                                             *leaving* Squid.  */
     unsigned int nfmarkFromServer;      /**< Stores the Netfilter mark value of the connection from the remote
                                             server. See FwdState::dispatch(). Note that this differs to
                                             nfmarkToServer in that this is the value we *receive* from the,
                                             connection, whereas nfmarkToServer is the value to set on packets
                                             *leaving* Squid.   */
 
     /** Clear the fde class back to NULL equivalent. */
     inline void clear() {
         type = 0;
         remote_port = 0;
         local_addr.setEmpty();
         tosToServer = '\0';
         nfmarkToServer = 0;
         sock_family = 0;
         memset(ipaddr, '\0', MAX_IPSTRLEN);
         memset(desc,'\0',FD_DESC_SZ);
         memset(&flags,0,sizeof(_fde_flags));
         bytes_read = 0;
         bytes_written = 0;
         pconn.uses = 0;
 #if USE_DELAY_POOLS
         clientInfo = NULL;
+        writeQuotaHandler = NULL;
 #endif
         epoll_state = 0;
         read_handler = NULL;
         read_data = NULL;
         write_handler = NULL;
         write_data = NULL;
         timeoutHandler = NULL;
         timeout = 0;
         writeStart = 0;
         lifetime_data = NULL;
         closeHandler = NULL;
         halfClosedReader = NULL;
         read_method = NULL;
         write_method = NULL;
         ssl.reset();
         dynamicTlsContext.reset();
 #if _SQUID_WINDOWS_
         win32.handle = (long)NULL;
 #endif
         tosFromServer = '\0';
         nfmarkFromServer = 0;
     }
 };
 
 #define fd_table fde::Table
 
 int fdNFree(void);
 
 #define FD_READ_METHOD(fd, buf, len) (*fd_table[fd].read_method)(fd, buf, len)
 #define FD_WRITE_METHOD(fd, buf, len) (*fd_table[fd].write_method)(fd, buf, len)

=== modified file 'src/http/Stream.cc'
--- src/http/Stream.cc	2017-01-01 00:12:22 +0000
+++ src/http/Stream.cc	2017-01-30 13:46:49 +0000
@@ -1,45 +1,51 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "client_side_request.h"
 #include "http/Stream.h"
 #include "HttpHdrContRange.h"
 #include "HttpHeaderTools.h"
 #include "Store.h"
 #include "TimeOrTag.h"
+#if USE_DELAY_POOLS
+#include "acl/FilledChecklist.h"
+#include "ClientInfo.h"
+#include "fde.h"
+#include "MessageDelayPools.h"
+#endif
 
 Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
     clientConnection(aConn),
     http(aReq),
     reply(nullptr),
     writtenToSocket(0),
     mayUseConnection_(false),
     connRegistered_(false)
 {
     assert(http != nullptr);
     memset(reqbuf, '\0', sizeof (reqbuf));
     flags.deferred = 0;
     flags.parsed_ok = 0;
     deferredparams.node = nullptr;
     deferredparams.rep = nullptr;
 }
 
 Http::Stream::~Stream()
 {
     if (auto node = getTail()) {
         if (auto ctx = dynamic_cast<Http::Stream *>(node->data.getRaw())) {
             /* We are *always* the tail - prevent recursive free */
             assert(this == ctx);
             node->data = nullptr;
         }
     }
     httpRequestFree(http);
 }
 
 void
@@ -256,60 +262,78 @@
 }
 
 void
 Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
 {
     prepareReply(rep);
     assert(rep);
     MemBuf *mb = rep->pack();
 
     // dump now, so we dont output any body.
     debugs(11, 2, "HTTP Client " << clientConnection);
     debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
 
     /* Save length of headers for persistent conn checks */
     http->out.headers_sz = mb->contentSize();
 #if HEADERS_LOG
     headersLog(0, 0, http->request->method, rep);
 #endif
 
     if (bodyData.data && bodyData.length) {
         if (multipartRangeRequest())
             packRange(bodyData, mb);
         else if (http->request->flags.chunkedReply) {
             packChunk(bodyData, *mb);
         } else {
             size_t length = lengthToSend(bodyData.range());
             noteSentBodyBytes(length);
             mb->append(bodyData.data, length);
         }
     }
+#if USE_DELAY_POOLS
+    for (const auto &pool: MessageDelayPools::Instance()->pools) {
+        if (pool->access) {
+            std::unique_ptr<ACLFilledChecklist> chl(clientAclChecklistCreate(pool->access, http));
+            chl->reply = rep;
+            HTTPMSGLOCK(chl->reply);
+            const allow_t answer = chl->fastCheck();
+            if (answer == ACCESS_ALLOWED) {
+                writeQuotaHandler = pool->createBucket();
+                fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
+                break;
+            } else {
+                debugs(83, 4, "Response delay pool " << pool->poolName <<
+                       " skipped because ACL " << answer);
+            }
+        }
+    }
+#endif
 
     getConn()->write(mb);
     delete mb;
 }
 
 void
 Http::Stream::sendBody(StoreIOBuffer bodyData)
 {
     if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
         size_t length = lengthToSend(bodyData.range());
         noteSentBodyBytes(length);
         getConn()->write(bodyData.data, length);
         return;
     }
 
     MemBuf mb;
     mb.init();
     if (multipartRangeRequest())
         packRange(bodyData, &mb);
     else
         packChunk(bodyData, mb);
 
     if (mb.contentSize())
         getConn()->write(&mb);
     else
         writeComplete(0);
 }
 
 size_t
 Http::Stream::lengthToSend(Range<int64_t> const &available) const

=== modified file 'src/http/Stream.h'
--- src/http/Stream.h	2017-01-01 00:12:22 +0000
+++ src/http/Stream.h	2017-01-30 13:46:49 +0000
@@ -1,44 +1,47 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_SRC_HTTP_STREAM_H
 #define SQUID_SRC_HTTP_STREAM_H
 
 #include "http/forward.h"
 #include "mem/forward.h"
 #include "StoreIOBuffer.h"
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class clientStreamNode;
 class ClientHttpRequest;
 
 namespace Http
 {
 
 /**
  * The processing context for a single HTTP transaction (stream).
  *
  * A stream lifetime extends from directly after a request has been parsed
  * off the client connection buffer, until the last byte of both request
  * and reply payload (if any) have been written, or it is otherwise
  * explicitly terminated.
  *
  * Streams self-register with the Http::Server Pipeline being managed by the
  * Server for the connection on which the request was received.
  *
  * The socket level management and I/O is done by a Server which owns us.
  * The scope of this objects control over a socket consists of the data
  * buffer received from the Server with an initially unknown length.
  * When that length is known it sets the end boundary of our access to the
  * buffer.
  *
  * The individual processing actions are done by other Jobs which we start.
  *
  * When a stream is completed the finished() method needs to be called which
  * will perform all cleanup and deregistration operations. If the reason for
  * finishing is an error, then notifyIoError() needs to be called prior to
  * the finished() method.
@@ -134,36 +137,39 @@
         unsigned parsed_ok:1; ///< Was this parsed correctly?
     } flags;
 
     bool mayUseConnection() const {return mayUseConnection_;}
 
     void mayUseConnection(bool aBool) {
         mayUseConnection_ = aBool;
         debugs(33, 3, "This " << this << " marked " << aBool);
     }
 
     class DeferredParams
     {
 
     public:
         clientStreamNode *node;
         HttpReply *rep;
         StoreIOBuffer queuedBuffer;
     };
 
     DeferredParams deferredparams;
     int64_t writtenToSocket;
 
 private:
     void prepareReply(HttpReply *);
     void packChunk(const StoreIOBuffer &bodyData, MemBuf &);
     void packRange(StoreIOBuffer const &, MemBuf *);
     void doClose();
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
+#if USE_DELAY_POOLS
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
+#endif
 };
 
 } // namespace Http
 
 #endif /* SQUID_SRC_HTTP_STREAM_H */
 

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to