Hello,

This patch introduces a new "response_delay_pools" feature. I have
posted it(with detailed description) recently to the thread for
preliminary review with "preview" flag.  This patch conforms to latest
v5 (r15011) and also has some problems fixed:

* MessageBucket::theAggregate pointer became invalid during
  reconfiguration.

* Response delay pools got stuck after reconfiguration, consuming
  no more data.

* A performance improvement: MessageDelayPools::Update() was called
  every second(via eventAdd) consuming CPU cycles even when no
  connections were served.


Regards,
Eduard.

Added response delay pools feature for Squid-to-client speed limiting.

The feature restricts Squid-to-client bandwidth only.  It applies to
both cache hits and misses.

When Squid starts delivering the final HTTP response to a client, Squid
checks response_delay_pool_access rules (supporting fast ACLs only), in
the order they were declared. The first rule with a matching ACL wins.
If (and only if) an "allow" rule won, Squid assigns the response to the
corresponding named delay pool.

If a response is assigned to a delay pool, the response becomes subject
to the configured bucket and aggregate bandwidth limits of that pool,
similar to the current "class 2" server-side delay pools, but with a
brand new, dedicated "individual" filled bucket assigned to the
matched response.

The new feature serves the same purpose as the existing client-side
pools: both features limit Squid-to-client bandwidth. Their common
interface was placed into a new base BandwidthBucket class.  The
difference is that client-side pools do not aggregate clients and always
use one bucket per client IP. It is possible that a response becomes a
subject of both these pools. In such situations only matched response
delay pool will be used for Squid-to-client speed limiting.

The accurate SMP support (with the aggregate bucket shared among
workers) is outside this patch scope. In SMP configurations,
Squid should automatically divide the aggregate_speed_limit and
max_aggregate_size values among the configured number of Squid
workers. 

=== added file 'src/BandwidthBucket.cc'
--- src/BandwidthBucket.cc	1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.cc	2016-10-26 14:09:11 +0000
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "ClientInfo.h"
+#include "comm/Connection.h"
+#include "Debug.h"
+#include "fde.h"
+
+extern double current_dtime;
+
+BandwidthBucket::BandwidthBucket(const int aWriteSpeedLimit, const double anInitialBurst,
+                                 const double aHighWatermark) :
+    bucketSize(anInitialBurst),
+    selectWaiting(false),
+    writeSpeedLimit(aWriteSpeedLimit),
+    bucketSizeLimit(aHighWatermark)
+{
+    getCurrentTime();
+    /* put current time to have something sensible here */
+    prevTime = current_dtime;
+}
+
+void
+BandwidthBucket::refillBucket()
+{
+    // all these times are in seconds, with double precision
+    const double currTime = current_dtime;
+    const double timePassed = currTime - prevTime;
+
+    // Calculate allowance for the time passed. Use double to avoid
+    // accumulating rounding errors for small intervals. For example, always
+    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+    const double gain = timePassed * writeSpeedLimit;
+
+    // XXX: Decide whether to add 'hash' field like ClientInfo::hash
+    //  debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
+    //         bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
+    //         " = " << gain << ')');
+
+    // to further combat error accumulation during micro updates,
+    // quit before updating time if we cannot add at least one byte
+    if (gain < 1.0)
+        return;
+
+    prevTime = currTime;
+
+    // for "first" connections, drain initial fat before refilling but keep
+    // updating prevTime to avoid bursts after the fat is gone
+    if (bucketSize > bucketSizeLimit) {
+        debugs(77,4, HERE << "not refilling while draining initial fat");
+        return;
+    }
+
+    bucketSize += gain;
+
+    // obey quota limits
+    if (bucketSize > bucketSizeLimit)
+        bucketSize = bucketSizeLimit;
+}
+
+bool
+BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    const int q = quota();
+    if (!q)
+        return false;
+    const int nleft_corrected = min(nleft, q);
+    if (nleft != nleft_corrected) {
+        debugs(77, 5, state->conn << " writes only " <<
+               nleft_corrected << " out of " << nleft);
+        nleft = nleft_corrected;
+    }
+    return true;
+}
+
+void
+BandwidthBucket::reduceBucket(const int len)
+{
+    if (len <= 0)
+        return;
+    bucketSize -= len;
+    if (bucketSize < 0.0) {
+        debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen
+        bucketSize = 0;
+    }
+}
+
+BandwidthBucket *
+BandwidthBucket::SelectBucket(fde *f)
+{
+    BandwidthBucket *bucket = f->writeQuotaHandler.getRaw();
+    if (!bucket) {
+        ClientInfo *clientInfo = f->clientInfo;
+        if (clientInfo && clientInfo->writeLimitingActive)
+            bucket = clientInfo;
+    }
+    return bucket;
+}
+
+#endif /* USE_DELAY_POOLS */
+

=== added file 'src/BandwidthBucket.h'
--- src/BandwidthBucket.h	1970-01-01 00:00:00 +0000
+++ src/BandwidthBucket.h	2017-01-22 08:40:53 +0000
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef BANDWIDTHBUCKET_H
+#define BANDWIDTHBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "comm/IoCallback.h"
+
+class fde;
+
+/// Base class for Squid-to-client bandwidth limiting
+class BandwidthBucket
+{
+public:
+    BandwidthBucket(const int aWriteSpeedLimit, const double anInitialBurst,
+                    const double aHighWatermark);
+    virtual ~BandwidthBucket() {}
+
+    static BandwidthBucket *SelectBucket(fde *f);
+
+    /// \returns the number of bytes this bucket allows to write,
+    /// also considering aggregates, if any.
+    virtual int quota() = 0;
+    /// Adjusts nleft to not exceed the current bucket quota value,
+    /// if needed.
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state);
+    /// Will plan another write call.
+    virtual void scheduleWrite(Comm::IoCallback *state) = 0;
+    /// Performs cleanup when the related file descriptor becomes closed.
+    virtual void onFdClosed() { selectWaiting = false; }
+    /// Decreases the bucket level.
+    virtual void reduceBucket(const int len);
+
+protected:
+    /// Increases the bucket level with the writeSpeedLimit speed.
+    void refillBucket();
+
+public:
+    double bucketSize; ///< how much can be written now
+    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+
+protected:
+    double prevTime; ///< previous time when we checked
+    double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
+    double bucketSizeLimit;  ///< maximum bucket size
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+

=== modified file 'src/ClientDelayConfig.cc'
--- src/ClientDelayConfig.cc	2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.cc	2017-01-22 08:42:37 +0000
@@ -1,103 +1,112 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "acl/Acl.h"
 #include "acl/Gadgets.h"
 #include "ClientDelayConfig.h"
 #include "ConfigParser.h"
 #include "Parsing.h"
 #include "Store.h"
 
+ClientDelayPool::~ClientDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
 void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const
 {
     LOCAL_ARRAY(char, nom, 32);
     snprintf(nom, 32, "client_delay_access %d", poolNumberMinusOne + 1);
     dump_acl_access(entry, nom, access);
     storeAppendPrintf(entry, "client_delay_parameters %d %d %" PRId64 "\n", poolNumberMinusOne + 1, rate,highwatermark);
     storeAppendPrintf(entry, "\n");
 }
 
 void
 ClientDelayConfig::finalize()
 {
     for (unsigned int i = 0; i < pools.size(); ++i) {
         /* pools require explicit 'allow' to assign a client into them */
-        if (!pools[i].access) {
+        if (!pools[i]->access) {
             debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) <<
                    " has no client_delay_access configured. " <<
                    "No client will ever use it.");
         }
     }
 }
 
-void ClientDelayConfig::freePoolCount()
+ClientDelayConfig::~ClientDelayConfig()
 {
-    pools.clear();
+    freePools();
 }
 
 void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
 {
     if (pools.size()) {
         storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size());
         for (unsigned int i = 0; i < pools.size(); ++i)
-            pools[i].dump(entry, i);
+            pools[i]->dump(entry, i);
+    }
+}
+
+void
+ClientDelayConfig::freePools()
+{
+    if (!pools.empty()) {
+        for (auto p: pools)
+            delete p;
+        pools.clear();
     }
 }
 
 void ClientDelayConfig::parsePoolCount()
 {
     if (pools.size()) {
         debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config");
-        clean();
+        freePools();
     }
     unsigned short pools_;
     ConfigParser::ParseUShort(&pools_);
     for (int i = 0; i < pools_; ++i) {
-        pools.push_back(ClientDelayPool());
+        pools.push_back(new ClientDelayPool());
     }
 }
 
 void ClientDelayConfig::parsePoolRates()
 {
     unsigned short pool;
     ConfigParser::ParseUShort(&pool);
 
     if (pool < 1 || pool > pools.size()) {
         debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
         return;
     }
 
     --pool;
 
-    pools[pool].rate = GetInteger();
-    pools[pool].highwatermark = GetInteger64();
+    pools[pool]->rate = GetInteger();
+    pools[pool]->highwatermark = GetInteger64();
 }
 
 void ClientDelayConfig::parsePoolAccess(ConfigParser &parser)
 {
     unsigned short pool;
 
     ConfigParser::ParseUShort(&pool);
 
     if (pool < 1 || pool > pools.size()) {
         debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
         return;
     }
 
     --pool;
-    aclParseAccessLine("client_delay_access", parser, &pools[pool].access);
-}
-
-void ClientDelayConfig::clean()
-{
-    for (unsigned int i = 0; i < pools.size(); ++i) {
-        aclDestroyAccessList(&pools[i].access);
-    }
+    aclParseAccessLine("client_delay_access", parser, &pools[pool]->access);
 }
 

=== modified file 'src/ClientDelayConfig.h'
--- src/ClientDelayConfig.h	2017-01-01 00:12:22 +0000
+++ src/ClientDelayConfig.h	2017-01-22 08:42:37 +0000
@@ -1,60 +1,66 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_CLIENTDELAYCONFIG_H
 #define SQUID_CLIENTDELAYCONFIG_H
 
 #include "acl/forward.h"
 
 #include <vector>
 
 class StoreEntry;
 class ConfigParser;
 
 /// \ingroup DelayPoolsAPI
 
 /* represents one client write limiting delay 'pool' */
 class ClientDelayPool
 {
 public:
     ClientDelayPool()
         :   access(NULL), rate(0), highwatermark(0) {}
+    ~ClientDelayPool();
+    ClientDelayPool(const ClientDelayPool &) = delete;
+    ClientDelayPool &operator=(const ClientDelayPool &) = delete;
+
     void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const;
     acl_access *access;
     int rate;
     int64_t highwatermark;
 };
 
-typedef std::vector<ClientDelayPool> ClientDelayPools;
+typedef std::vector<ClientDelayPool*> ClientDelayPools;
 
 /* represents configuration of client write limiting delay pools */
 class ClientDelayConfig
 {
 public:
     ClientDelayConfig()
         :   initial(50) {}
-    void freePoolCount();
+    ~ClientDelayConfig();
+    ClientDelayConfig(const ClientDelayConfig &) = delete;
+    ClientDelayConfig &operator=(const ClientDelayConfig &) = delete;
+
+    void freePools();
     void dumpPoolCount(StoreEntry * entry, const char *name) const;
     /* parsing of client_delay_pools - number of pools */
     void parsePoolCount();
     /* parsing of client_delay_parameters lines */
     void parsePoolRates();
     /* parsing client_delay_access lines */
     void parsePoolAccess(ConfigParser &parser);
 
     void finalize(); ///< checks pools configuration
 
     /* initial bucket level, how fill bucket at startup */
     unsigned short initial;
     ClientDelayPools pools;
-private:
-    void clean();
 };
 
 #endif // SQUID_CLIENTDELAYCONFIG_H
 

=== modified file 'src/ClientInfo.h'
--- src/ClientInfo.h	2017-01-01 00:12:22 +0000
+++ src/ClientInfo.h	2017-01-22 08:42:37 +0000
@@ -1,113 +1,117 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID__SRC_CLIENTINFO_H
 #define SQUID__SRC_CLIENTINFO_H
 
+#if USE_DELAY_POOLS
+#include "BandwidthBucket.h"
+#endif
 #include "base/ByteCounter.h"
 #include "cbdata.h"
 #include "enums.h"
 #include "hash.h"
 #include "ip/Address.h"
 #include "LogTags.h"
 #include "mem/forward.h"
 #include "typedefs.h"
 
 #include <deque>
 
 #if USE_DELAY_POOLS
 class CommQuotaQueue;
 #endif
 
-class ClientInfo
+class ClientInfo : public hash_link
+#if USE_DELAY_POOLS
+    , public BandwidthBucket
+#endif
 {
     MEMPROXY_CLASS(ClientInfo);
 
 public:
     explicit ClientInfo(const Ip::Address &);
     ~ClientInfo();
 
-    hash_link hash;             /* must be first */
-
     Ip::Address addr;
 
     struct Protocol {
         Protocol() : n_requests(0) {
             memset(result_hist, 0, sizeof(result_hist));
         }
 
         int result_hist[LOG_TYPE_MAX];
         int n_requests;
         ByteCounter kbytes_in;
         ByteCounter kbytes_out;
         ByteCounter hit_kbytes_out;
     } Http, Icp;
 
     struct Cutoff {
         Cutoff() : time(0), n_req(0), n_denied(0) {}
 
         time_t time;
         int n_req;
         int n_denied;
     } cutoff;
     int n_established;          /* number of current established connections */
     time_t last_seen;
 #if USE_DELAY_POOLS
-    double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
-    double prevTime; ///< previous time when we checked
-    double bucketSize; ///< how much can be written now
-    double bucketSizeLimit;  ///< maximum bucket size
     bool writeLimitingActive; ///< Is write limiter active
     bool firstTimeConnection;///< is this first time connection for this client
 
     CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
     int rationedQuota; ///< precomputed quota preserving fairness among clients
     int rationedCount; ///< number of clients that will receive rationedQuota
-    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
     bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
 
     // all those functions access Comm fd_table and are defined in comm.cc
     bool hasQueue() const;  ///< whether any clients are waiting for write quota
     bool hasQueue(const CommQuotaQueue*) const;  ///< has a given queue
     unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
     int quotaPeekFd() const; ///< retuns the next fd reservation
     unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
     void quotaDequeue(); ///< pops queue head from queue
     void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
-    int quotaForDequed(); ///< allocate quota for a just dequeued client
-    void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+    /* BandwidthBucket API */
+    virtual int quota() override; ///< allocate quota for a just dequeued client
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void onFdClosed() override;
+    virtual void reduceBucket(int len) override;
 
     void quotaDumpQueue(); ///< dumps quota queue for debugging
 
     /**
      * Configure client write limiting (note:"client" here means - IP). It is called
      * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
      * computed, using the configured maximum bucket vavlue and configured initial
      * bucket value(50% by default).
      *
      * \param  writeSpeedLimit is speed limit configured in config for this pool
      * \param  initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
      *  \param highWatermark is maximum bucket value
      */
     void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
 #endif /* USE_DELAY_POOLS */
 };
 
 #if USE_DELAY_POOLS
 // a queue of Comm clients waiting for I/O quota controlled by delay pools
 class CommQuotaQueue
 {
     CBDATA_CLASS(CommQuotaQueue);
 
 public:
     CommQuotaQueue(ClientInfo *info);
     ~CommQuotaQueue();
 
     bool empty() const { return fds.empty(); }
     size_t size() const { return fds.size(); }
     int front() const { return fds.front(); }

=== modified file 'src/Makefile.am'
--- src/Makefile.am	2017-01-08 05:12:44 +0000
+++ src/Makefile.am	2017-01-22 08:42:37 +0000
@@ -74,68 +74,74 @@
 SUBDIRS += adaptation
 endif
 DIST_SUBDIRS += adaptation
 
 if ENABLE_ESI
 SUBDIRS += esi
 ESI_LIBS = \
 	esi/libesi.la \
 	$(top_builddir)/lib/libTrie/libTrie.a \
 	$(XMLLIB) \
 	$(EXPATLIB)
 else
 ESI_LIBS =
 endif
 DIST_SUBDIRS += esi
 
 DELAY_POOL_ALL_SOURCE = \
 	CommonPool.h \
 	CompositePoolNode.h \
 	delay_pools.cc \
 	DelayId.cc \
 	DelayId.h \
 	DelayIdComposite.h \
 	DelayBucket.cc \
 	DelayBucket.h \
 	DelayConfig.cc \
 	DelayConfig.h \
 	DelayPool.cc \
 	DelayPool.h \
 	DelayPools.h \
+	MessageDelayPools.h \
+	MessageDelayPools.cc \
 	DelaySpec.cc \
 	DelaySpec.h \
 	DelayTagged.cc \
 	DelayTagged.h \
 	DelayUser.cc \
 	DelayUser.h \
 	DelayVector.cc \
 	DelayVector.h \
+	BandwidthBucket.cc \
+	BandwidthBucket.h \
+	MessageBucket.cc \
+	MessageBucket.h \
 	NullDelayId.cc \
 	NullDelayId.h \
 	ClientDelayConfig.cc \
 	ClientDelayConfig.h
 
 if ENABLE_DELAY_POOLS
 DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
 else
 DELAY_POOL_SOURCE =
 endif
 
 if ENABLE_XPROF_STATS
 XPROF_STATS_SOURCE = ProfStats.cc
 else
 XPROF_STATS_SOURCE =
 endif
 
 if ENABLE_HTCP
 HTCPSOURCE = htcp.cc htcp.h
 endif
 
 if ENABLE_LEAKFINDER
 LEAKFINDERSOURCE =  LeakFinder.cc
 else
 LEAKFINDERSOURCE =
 endif
 
 if ENABLE_UNLINKD
 UNLINKDSOURCE = unlinkd.h unlinkd.cc
 UNLINKD = unlinkd
@@ -940,60 +946,61 @@
 	HttpHeader.cc \
 	HttpHeaderMask.h \
 	HttpHeaderFieldInfo.h \
 	HttpHeaderTools.h \
 	HttpHeaderTools.cc \
 	HttpControlMsg.cc \
 	HttpControlMsg.h \
 	HttpMsg.cc \
 	HttpMsg.h \
 	HttpReply.cc \
 	HttpReply.h \
 	MasterXaction.cc \
 	MasterXaction.h \
 	MemBuf.cc \
 	MemBuf.h \
 	mime_header.h \
 	mime_header.cc \
 	Notes.h \
 	Notes.cc \
 	SquidString.h \
 	SquidTime.h \
 	tests/stub_SBufDetailedStats.cc \
 	String.cc \
 	StrList.h \
 	StrList.cc \
 	log/access_log.h \
 	tests/stub_access_log.cc \
 	cache_cf.h \
 	tests/stub_cache_cf.cc \
 	tests/stub_cache_manager.cc \
+	tests/stub_ClientDelayConfig.cc \
 	tests/stub_comm.cc \
 	tests/stub_debug.cc \
 	tests/stub_errorpage.cc \
 	tests/stub_event.cc \
 	tests/stub_fd.cc \
 	tests/stub_HelperChildConfig.cc \
 	tests/stub_libformat.cc \
 	tests/stub_libauth.cc \
 	tests/stub_libcomm.cc \
 	tests/stub_libmgr.cc \
 	tests/stub_libsecurity.cc \
 	tests/stub_libsslsquid.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
 	tests/stub_StatHist.cc \
 	repl_modules.h \
 	tests/stub_store.cc \
 	tests/stub_store_stats.cc \
 	tools.h \
 	tests/stub_tools.cc \
 	tests/stub_HttpRequest.cc \
 	tests/testHttpReply.cc \
 	tests/testHttpReply.h \
 	tests/stub_time.cc \
 	url.cc \
 	wordlist.h \
 	wordlist.cc
 nodist_tests_testHttpReply_SOURCES=\
 	$(TESTSOURCES)
@@ -1052,60 +1059,61 @@
 	MasterXaction.h \
 	Notes.cc \
 	Notes.h \
 	mem_node.cc \
 	Parsing.cc \
 	tests/stub_libsecurity.cc \
 	SquidMath.cc \
 	StatCounters.cc \
 	StatCounters.h \
 	StatHist.h \
 	StrList.h \
 	StrList.cc \
 	tests/stub_StatHist.cc \
 	stmem.cc \
 	tests/stub_SBufDetailedStats.cc \
 	String.cc \
 	StoreIOState.cc \
 	tests/stub_StoreMeta.cc \
 	StoreMetaUnpacker.cc \
 	StoreSwapLogData.cc \
 	store_key_md5.h \
 	store_key_md5.cc \
 	swap_log_op.cc \
 	swap_log_op.h \
 	tests/stub_SwapDir.cc \
 	Transients.cc \
 	log/access_log.h \
 	tests/stub_access_log.cc \
 	cache_cf.h \
 	tests/stub_cache_cf.cc \
+	tests/stub_ClientDelayConfig.cc \
 	tests/stub_client_side.cc \
 	tests/stub_debug.cc \
 	tests/stub_DelayId.cc \
 	tests/stub_errorpage.cc \
 	fd.h \
 	tests/stub_fd.cc \
 	tests/stub_HttpRequest.cc \
 	tests/stub_HttpReply.cc \
 	tests/stub_ipc_TypedMsgHdr.cc \
 	tests/stub_libauth.cc \
 	tests/stub_libcomm.cc \
 	tests/stub_libdiskio.cc \
 	tests/stub_libformat.cc \
 	tests/stub_libmem.cc \
 	tests/stub_libsslsquid.cc \
 	tests/stub_MemObject.cc \
 	tests/stub_MemStore.cc \
 	mime.h \
 	tests/stub_mime.cc \
 	tests/stub_pconn.cc \
 	tests/stub_Port.cc \
 	repl_modules.h \
 	tests/stub_store.cc \
 	tests/stub_store_client.cc \
 	store_rebuild.h \
 	tests/stub_store_rebuild.cc \
 	tests/stub_store_stats.cc \
 	tests/stub_store_swapout.cc \
 	tools.h \
 	tests/stub_tools.cc \
@@ -2323,60 +2331,61 @@
 	parser/Tokenizer.h \
 	SquidString.h \
 	String.cc \
 	$(TESTSOURCES) \
 	tests/stub_debug.cc \
 	tests/stub_libmem.cc \
 	tests/stub_time.cc \
 	tests/stub_SBufDetailedStats.cc
 tests_testTokenizer_LDFLAGS = $(LIBADD_DL)
 tests_testTokenizer_LDADD = \
 	parser/libparser.la \
 	base/libbase.la \
 	sbuf/libsbuf.la \
 	$(top_builddir)/lib/libmiscutil.la \
 	$(LIBCPPUNIT_LIBS) \
 	$(COMPAT_LIB) \
 	$(XTRA_LIBS)
 
 tests_testHttp1Parser_SOURCES = \
 	Debug.h \
 	MemBuf.cc \
 	MemBuf.h \
 	tests/stub_MemObject.cc \
 	mime_header.cc \
 	mime_header.h \
 	String.cc \
 	cache_cf.h \
 	tests/stub_SBufDetailedStats.cc \
 	tests/stub_cache_cf.cc \
 	tests/stub_cache_manager.cc \
+	tests/stub_ClientDelayConfig.cc \
 	tests/stub_comm.cc \
 	tests/stub_cbdata.cc \
 	tests/stub_debug.cc \
 	tests/stub_event.cc \
 	tests/stub_HelperChildConfig.cc \
 	tests/stub_libmem.cc \
 	tests/stub_libsecurity.cc \
 	tests/stub_stmem.cc \
 	tests/stub_store.cc \
 	tests/stub_store_stats.cc \
 	tools.h \
 	tests/stub_tools.cc \
 	tests/testHttp1Parser.cc \
 	tests/testHttp1Parser.h \
 	tests/stub_time.cc \
 	wordlist.h \
 	wordlist.cc
 nodist_tests_testHttp1Parser_SOURCES = \
 	$(TESTSOURCES)
 tests_testHttp1Parser_LDADD= \
 	http/libhttp.la \
 	parser/libparser.la \
 	anyp/libanyp.la \
 	SquidConfig.o \
 	base/libbase.la \
 	ip/libip.la \
 	sbuf/libsbuf.la \
 	$(top_builddir)/lib/libmiscutil.la \
 	$(SSLLIB) \
 	$(LIBCPPUNIT_LIBS) \

=== added file 'src/MessageBucket.cc'
--- src/MessageBucket.cc	1970-01-01 00:00:00 +0000
+++ src/MessageBucket.cc	2017-01-04 10:51:52 +0000
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "comm/Connection.h"
+#include "DelayPools.h"
+#include "fde.h"
+#include "MessageBucket.h"
+
+MessageBucket::MessageBucket(const int aWriteSpeedLimit, const double anInitialBurst,
+                             const double aHighWatermark, MessageDelayPool::Pointer pool) :
+    BandwidthBucket(aWriteSpeedLimit, anInitialBurst, aHighWatermark),
+    theAggregate(pool) {}
+
+void *
+MessageBucket::operator new(size_t size)
+{
+    DelayPools::MemoryUsed += sizeof (MessageBucket);
+    return ::operator new (size);
+}
+
+void
+MessageBucket::operator delete (void *address)
+{
+    DelayPools::MemoryUsed -= sizeof (MessageBucket);
+    ::operator delete (address);
+}
+
+int
+MessageBucket::quota()
+{
+    refillBucket();
+    theAggregate->refillBucket();
+    return min(bucketSize, static_cast<double>(theAggregate->level()));
+}
+
+void
+MessageBucket::reduceBucket(int len)
+{
+    BandwidthBucket::reduceBucket(len);
+    theAggregate->bytesIn(len);
+}
+
+void
+MessageBucket::scheduleWrite(Comm::IoCallback *state)
+{
+    fde *F = &fd_table[state->conn->fd];
+    if (!F->writeQuotaHandler->selectWaiting) {
+        F->writeQuotaHandler->selectWaiting = true;
+        // message delay pools limit this write; see checkTimeouts()
+        SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0);
+    }
+}
+
+#endif /* USE_DELAY_POOLS */
+

=== added file 'src/MessageBucket.h'
--- src/MessageBucket.h	1970-01-01 00:00:00 +0000
+++ src/MessageBucket.h	2016-12-26 13:28:25 +0000
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEBUCKET_H
+#define MESSAGEBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "base/RefCount.h"
+#include "comm/forward.h"
+#include "MessageDelayPools.h"
+
+namespace Comm
+{
+extern PF HandleWrite;
+extern void SetSelect(int, unsigned int, PF *, void *, time_t);
+}
+
+/// Limits Squid-to-client bandwidth for each matching response
+class MessageBucket : public RefCountable, public BandwidthBucket
+{
+public:
+    typedef RefCount<MessageBucket> Pointer;
+
+    MessageBucket(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark, MessageDelayPool::Pointer pool);
+
+    void *operator new(size_t);
+    void operator delete (void *);
+
+    /* BandwidthBucket API */
+    virtual int quota() override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void reduceBucket(int len);
+
+private:
+    MessageDelayPool::Pointer theAggregate;
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+

=== added file 'src/MessageDelayPools.cc'
--- src/MessageDelayPools.cc	1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.cc	2017-01-04 11:21:07 +0000
@@ -0,0 +1,167 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include <algorithm>
+#include <map>
+#include "acl/Gadgets.h"
+#include "cache_cf.h"
+#include "ConfigParser.h"
+#include "DelaySpec.h"
+#include "event.h"
+#include "MessageBucket.h"
+#include "MessageDelayPools.h"
+#include "Parsing.h"
+#include "SquidTime.h"
+
+MessageDelayPools::~MessageDelayPools()
+{
+    freePools();
+}
+
+MessageDelayPools *
+MessageDelayPools::Instance()
+{
+    static MessageDelayPools pools;
+    return &pools;
+}
+
+MessageDelayPool::Pointer
+MessageDelayPools::pool(const SBuf &name)
+{
+    auto it = std::find_if(pools.begin(), pools.end(),
+    [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; });
+    return it == pools.end() ? 0 : *it;
+}
+
+void
+MessageDelayPools::add(MessageDelayPool *p)
+{
+    const auto it = std::find_if(pools.begin(), pools.end(),
+    [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; });
+    if (it != pools.end()) {
+        debugs(3, DBG_CRITICAL, "Ignoring duplicate " << p->poolName << " response delay pool");
+        return;
+    }
+    pools.push_back(p);
+}
+
+void
+MessageDelayPools::freePools()
+{
+    pools.clear();
+}
+
+MessageDelayPool::MessageDelayPool(const SBuf &name, uint64_t bucketSpeed, uint64_t bucketSize,
+                                   uint64_t aggregateSpeed, uint64_t aggregateSize, uint16_t initial):
+    access(0),
+    poolName(name),
+    bucketSpeedLimit(bucketSpeed),
+    maxBucketSize(bucketSize),
+    aggregateSpeedLimit(aggregateSpeed),
+    maxAggregateSize(aggregateSize),
+    initialFillLevel(initial),
+    lastUpdate(squid_curtime)
+{
+    theBucket.level() = maxAggregateSize;
+}
+
+MessageDelayPool::~MessageDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
+void
+MessageDelayPool::refillBucket()
+{
+    const int incr = squid_curtime - lastUpdate;
+    if (incr >= 1) {
+        lastUpdate = squid_curtime;
+        DelaySpec spec;
+        spec.restore_bps = aggregateSpeedLimit;
+        spec.max_bytes = maxAggregateSize;
+        theBucket.update(spec, incr);
+    }
+}
+
+MessageBucket::Pointer
+MessageDelayPool::createBucket()
+{
+    return new MessageBucket(bucketSpeedLimit, bucketSpeedLimit * (initialFillLevel / 100.0), maxBucketSize, this);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPool()
+{
+    std::map<SBuf, int64_t> params = {
+        {SBuf("bucket_speed_limit="), -1},
+        {SBuf("max_bucket_size="), -1},
+        {SBuf("aggregate_speed_limit="), -1},
+        {SBuf("max_aggregate_size="), -1},
+        {SBuf("initial_fill_level="), 50}
+    };
+    const SBuf name(ConfigParser::NextToken());
+    if (name.isEmpty()) {
+        debugs(3, DBG_CRITICAL, "ERROR: required parameter \"name\" for response_delay_pool option missing.");
+        self_destruct();
+    }
+    while (const char *token = ConfigParser::NextToken()) {
+        auto it = params.begin();
+        for (; it != params.end(); ++it) {
+            SBuf n = it->first;
+            if (!strncmp(token, n.c_str(), n.length())) {
+                it->second = xatoll(token + it->first.length(), 10);
+                break;
+            }
+        }
+        if (it == params.end()) {
+            debugs(3, DBG_CRITICAL, "ERROR: option " << token << " is not supported for response_delay_pool.");
+            self_destruct();
+        }
+    }
+    for (const auto &p: params) {
+        if (p.second == -1) {
+            const SBuf failedOption = p.first.substr(0, p.first.length() - 1);
+            debugs(3, DBG_CRITICAL, "ERROR: required " << failedOption << " option missing.");
+            self_destruct();
+        }
+    }
+
+    MessageDelayPool *pool = new MessageDelayPool(name,
+            static_cast<uint64_t>(params[SBuf("bucket_speed_limit=")]),
+            static_cast<uint64_t>(params[SBuf("max_bucket_size=")]),
+            static_cast<uint64_t>(params[SBuf("aggregate_speed_limit=")]),
+            static_cast<uint64_t>(params[SBuf("max_aggregate_size=")]),
+            static_cast<uint16_t>(params[SBuf("initial_fill_level=")])
+                                                 );
+    MessageDelayPools::Instance()->add(pool);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPoolAccess(ConfigParser &parser) {
+    const char *token = ConfigParser::NextToken();
+    if (!token) {
+        debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing");
+        return;
+    }
+    MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token));
+    if (pool)
+        aclParseAccessLine("response_delay_pool_access", parser, &pool->access);
+}
+
+void
+MessageDelayConfig::freePools()
+{
+    MessageDelayPools::Instance()->freePools();
+}
+
+#endif
+

=== added file 'src/MessageDelayPools.h'
--- src/MessageDelayPools.h	1970-01-01 00:00:00 +0000
+++ src/MessageDelayPools.h	2017-01-04 11:26:15 +0000
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEDELAYPOOLS_H
+#define MESSAGEDELAYPOOLS_H
+
+#if USE_DELAY_POOLS
+
+#include "acl/Acl.h"
+#include "base/RefCount.h"
+#include "DelayBucket.h"
+#include "DelayPools.h"
+
+class MessageBucket;
+typedef RefCount<MessageBucket> MessageBucketPointer;
+
+/// \ingroup DelayPoolsAPI
+/// Represents one 'response' delay pool, creates individual response
+/// buckets and performes aggregate limiting for them
+class MessageDelayPool : public RefCountable
+{
+public:
+    typedef RefCount<MessageDelayPool> Pointer;
+
+    MessageDelayPool(const SBuf &name, uint64_t bucketSpeed, uint64_t bucketSize,
+                     uint64_t aggregateSpeed, uint64_t aggregateSize, uint16_t initial);
+    ~MessageDelayPool();
+    MessageDelayPool(const MessageDelayPool &) = delete;
+    MessageDelayPool &operator=(const MessageDelayPool &) = delete;
+
+    /// Increases the aggregate bucket level with the aggregateSpeedLimit speed.
+    void refillBucket();
+    /// decreases the aggregate level
+    void bytesIn(int qty) { theBucket.bytesIn(qty); }
+    /// current aggregate level
+    int level() { return theBucket.level(); }
+    /// creates an individual response bucket
+    MessageBucketPointer createBucket();
+
+    acl_access *access;
+    /// the response delay pool name
+    SBuf poolName;
+    /// the speed limit of an individual bucket (bytes/s)
+    uint64_t bucketSpeedLimit;
+    /// the maximum size of an individual bucket
+    uint64_t maxBucketSize;
+    /// the speed limit of the aggregate bucket (bytes/s)
+    uint64_t aggregateSpeedLimit;
+    /// the maximum size of the aggregate bucket
+    uint64_t maxAggregateSize;
+    /// the initial bucket size as a percentage of maxBucketSize
+    uint16_t initialFillLevel;
+    /// the aggregate bucket
+    DelayBucket theBucket;
+
+private:
+    /// Time the aggregate bucket level was last refilled.
+    time_t lastUpdate;
+};
+
+/// \ingroup DelayPoolsAPI
+/// represents all configured 'response' delay pools
+class MessageDelayPools
+{
+public:
+    MessageDelayPools(const MessageDelayPools &) = delete;
+    MessageDelayPools &operator=(const MessageDelayPools &) = delete;
+
+    static MessageDelayPools *Instance();
+
+    /// returns a MessageDelayPool with a given name or null otherwise
+    MessageDelayPool::Pointer pool(const SBuf &name);
+    /// appends a single MessageDelayPool, created during configuration
+    void add(MessageDelayPool *pool);
+    /// memory cleanup, performing during reconfiguration
+    void freePools();
+
+    std::vector<MessageDelayPool::Pointer> pools;
+
+private:
+    MessageDelayPools(){}
+    ~MessageDelayPools();
+    void Stats() { } // TODO
+};
+
+/// represents configuration for response delay pools
+class MessageDelayConfig
+{
+public:
+    void parseResponseDelayPool();
+    void parseResponseDelayPoolAccess(ConfigParser &parser);
+    void freePools();
+};
+
+#endif
+#endif
+

=== modified file 'src/SquidConfig.h'
--- src/SquidConfig.h	2017-01-01 00:12:22 +0000
+++ src/SquidConfig.h	2017-01-22 08:42:37 +0000
@@ -1,49 +1,54 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_SQUIDCONFIG_H_
 #define SQUID_SQUIDCONFIG_H_
 
 #include "acl/forward.h"
 #include "base/RefCount.h"
 #include "base/YesNoNone.h"
+#if USE_DELAY_POOLS
 #include "ClientDelayConfig.h"
 #include "DelayConfig.h"
+#endif
 #include "helper/ChildConfig.h"
 #include "HttpHeaderTools.h"
 #include "ip/Address.h"
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+#endif
 #include "Notes.h"
 #include "security/forward.h"
 #include "SquidTime.h"
 #if USE_OPENSSL
 #include "ssl/support.h"
 #endif
 #include "store/forward.h"
 
 #if USE_OPENSSL
 class sslproxy_cert_sign;
 class sslproxy_cert_adapt;
 #endif
 
 namespace Mgr
 {
 class ActionPasswordList;
 } // namespace Mgr
 class CachePeer;
 class CustomLog;
 class CpuAffinityMap;
 class external_acl;
 class HeaderManglers;
 class RefreshPattern;
 class RemovalPolicySettings;
 
 namespace AnyP
 {
 class PortCfg;
 }
 
@@ -407,60 +412,61 @@
         int eprt;
         int sanitycheck;
         int telnet;
     } Ftp;
     RefreshPattern *Refresh;
 
     Store::DiskConfig cacheSwap;
 
     struct {
         char *directory;
         int use_short_names;
     } icons;
     char *errorDirectory;
 #if USE_ERR_LOCALES
     char *errorDefaultLanguage;
     int errorLogMissingLanguages;
 #endif
     char *errorStylesheet;
 
     struct {
         int onerror;
     } retry;
 
     struct {
         int64_t limit;
     } MemPools;
 #if USE_DELAY_POOLS
 
     DelayConfig Delay;
     ClientDelayConfig ClientDelay;
+    MessageDelayConfig MessageDelay;
 #endif
 
     struct {
         struct {
             int average;
             int min_poll;
         } dns, udp, tcp;
     } comm_incoming;
     int max_open_disk_fds;
     int uri_whitespace;
     AclSizeLimit *rangeOffsetLimit;
 #if MULTICAST_MISS_STREAM
 
     struct {
 
         Ip::Address addr;
         int ttl;
         unsigned short port;
         char *encode_key;
     } mcast_miss;
 #endif
 
     /// request_header_access and request_header_replace
     HeaderManglers *request_header_access;
     /// reply_header_access and reply_header_replace
     HeaderManglers *reply_header_access;
     ///request_header_add access list
     HeaderWithAclList *request_header_add;
     ///reply_header_add access list
     HeaderWithAclList *reply_header_add;

=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc	2017-01-01 00:12:22 +0000
+++ src/cache_cf.cc	2017-01-22 08:42:37 +0000
@@ -1627,88 +1627,114 @@
 }
 
 static void
 parse_delay_pool_rates(DelayConfig * cfg)
 {
     cfg->parsePoolRates();
 }
 
 static void
 parse_delay_pool_access(DelayConfig * cfg)
 {
     cfg->parsePoolAccess(LegacyParser);
 }
 
 #endif
 
 #if USE_DELAY_POOLS
 #include "ClientDelayConfig.h"
 /* do nothing - free_client_delay_pool_count is the magic free function.
  * this is why client_delay_pool_count isn't just marked TYPE: u_short
  */
 
 #define free_client_delay_pool_access(X)
 #define free_client_delay_pool_rates(X)
 #define dump_client_delay_pool_access(X, Y, Z)
 #define dump_client_delay_pool_rates(X, Y, Z)
 
 static void
 free_client_delay_pool_count(ClientDelayConfig * cfg)
 {
-    cfg->freePoolCount();
+    cfg->freePools();
 }
 
 static void
 dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
 {
     cfg.dumpPoolCount (entry, name);
 }
 
 static void
 parse_client_delay_pool_count(ClientDelayConfig * cfg)
 {
     cfg->parsePoolCount();
 }
 
 static void
 parse_client_delay_pool_rates(ClientDelayConfig * cfg)
 {
     cfg->parsePoolRates();
 }
 
 static void
 parse_client_delay_pool_access(ClientDelayConfig * cfg)
 {
     cfg->parsePoolAccess(LegacyParser);
 }
 #endif
 
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+
+#define free_response_delay_pool_access(X)
+#define dump_response_delay_pool_access(X, Y, Z)
+#define dump_response_delay_pool_parameters(X, Y, Z)
+
+static void
+free_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->freePools();
+}
+
+static void
+parse_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPool();
+}
+
+static void
+parse_response_delay_pool_access(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPoolAccess(LegacyParser);
+}
+#endif
+
 #if USE_HTTP_VIOLATIONS
 static void
 dump_http_header_access(StoreEntry * entry, const char *name, const HeaderManglers *manglers)
 {
     if (manglers)
         manglers->dumpAccess(entry, name);
 }
 
 static void
 parse_http_header_access(HeaderManglers **pm)
 {
     char *t = NULL;
 
     if ((t = ConfigParser::NextToken()) == NULL) {
         debugs(3, DBG_CRITICAL, "" << cfg_filename << " line " << config_lineno << ": " << config_input_line);
         debugs(3, DBG_CRITICAL, "parse_http_header_access: missing header name.");
         return;
     }
 
     if (!*pm)
         *pm = new HeaderManglers;
     HeaderManglers *manglers = *pm;
     headerMangler *mangler = manglers->track(t);
     assert(mangler);
 
     std::string directive = "http_header_access ";
     directive += t;
     aclParseAccessLine(directive.c_str(), LegacyParser, &mangler->access_list);
 }
 

=== modified file 'src/cf.data.depend'
--- src/cf.data.depend	2017-01-01 00:12:22 +0000
+++ src/cf.data.depend	2017-01-22 08:42:37 +0000
@@ -6,60 +6,62 @@
 ##
 #
 # type			dependencies
 #
 access_log		acl	logformat
 acl			external_acl_type auth_param
 acl_access		acl
 acl_address		acl
 acl_b_size_t		acl
 acl_tos			acl
 acl_nfmark		acl
 address
 authparam
 AuthSchemes		acl auth_param
 b_int64_t
 b_size_t
 b_ssize_t
 cachedir		cache_replacement_policy
 cachemgrpasswd
 ConfigAclTos
 configuration_includes_quoted_values
 CpuAffinityMap
 debug
 delay_pool_access	acl	delay_class
 delay_pool_class	delay_pools
 delay_pool_count
 delay_pool_rates	delay_class
 client_delay_pool_access	acl
 client_delay_pool_count
 client_delay_pool_rates
+response_delay_pool_access	acl
+response_delay_pool_parameters
 denyinfo		acl
 eol
 externalAclHelper	auth_param
 HelperChildConfig
 hostdomain		cache_peer
 hostdomaintype		cache_peer
 http_header_access	acl
 http_header_replace
 HeaderWithAclList	acl
 adaptation_access_type	adaptation_service_set adaptation_service_chain acl icap_service icap_class
 adaptation_service_set_type	icap_service ecap_service
 adaptation_service_chain_type	icap_service ecap_service
 icap_access_type	icap_class acl
 icap_class_type		icap_service
 icap_service_type
 icap_service_failure_limit
 icmp
 ecap_service_type
 int
 int64_t
 kb_int64_t
 kb_size_t
 logformat
 YesNoNone
 memcachemode
 note			acl
 obsolete
 onoff
 on_unsupported_protocol	acl
 peer

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre	2017-01-07 03:00:43 +0000
+++ src/cf.data.pre	2017-01-22 08:42:37 +0000
@@ -7086,60 +7086,117 @@
 
 	    client_delay_access pool_ID allow|deny acl_name
 
 	All client_delay_access options are checked in their pool ID
 	order, starting with pool 1. The first checked pool with allowed
 	request is selected for the request. If no ACL matches or there
 	are no client_delay_access options, the request bandwidth is not
 	limited.
 
 	The ACL-selected pool is then used to find the
 	client_delay_parameters for the request. Client-side pools are
 	not used to aggregate clients. Clients are always aggregated
 	based on their source IP addresses (one bucket per source IP).
 
 	This clause only supports fast acl types.
 	See http://wiki.squid-cache.org/SquidFaq/SquidAcl for details.
 	Additionally, only the client TCP connection details are available.
 	ACLs testing HTTP properties will not work.
 
 	Please see delay_access for more examples.
 
 	Example:
 		client_delay_access 1 allow low_rate_network
 		client_delay_access 2 allow vips_network
 
 
 	See also client_delay_parameters and client_delay_pools.
 DOC_END
 
 COMMENT_START
+ RESPONSE DELAY POOL PARAMETERS
+ -----------------------------------------------------------------------------
+COMMENT_END
+
+NAME: response_delay_pool
+TYPE: response_delay_pool_parameters
+DEFAULT: none
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+	This option configures client response bandwidth limits using the
+	following format:
+
+	response_delay_pool name [option=value] ...
+
+	name	the response delay pool name
+
+	available options:
+
+		bucket_speed_limit	the speed limit of an individual
+					bucket(bytes/s)
+
+		max_bucket_size  	the maximum size of a bucket
+
+		aggregate_speed_limit	the speed limit to the aggregate
+					bucket(bytes/s)
+
+		max_aggregate_size	the maximum size of the aggregate bucket
+
+		initial_fill_level	the initial bucket size as a percentage
+					of max_bucket_size
+
+	See also response_delay_pool_access.
+DOC_END
+
+NAME: response_delay_pool_access
+TYPE: response_delay_pool_access
+DEFAULT: none
+DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conffor the pool.
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+	This option determines the response delay pool for the
+	request:
+
+	response_delay_pool_access pool_name allow|deny acl_name
+
+	All response_delay_pool_access options are checked in the order
+	they appear in this configuration file. The first rule with a
+	matching ACL wins. If (and only if) an "allow" rule won, Squid
+	assigns the response to the corresponding named delay pool.
+	This feature restricts Squid-to-client bandwidth only. If no
+	ACL matches or there are no response_delay_pool_access options,
+	the Squid-to-client bandwidth is not limited.
+DOC_END
+
+COMMENT_START
  WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
  -----------------------------------------------------------------------------
 COMMENT_END
 
 NAME: wccp_router
 TYPE: address
 LOC: Config.Wccp.router
 DEFAULT: any_addr
 DEFAULT_DOC: WCCP disabled.
 IFDEF: USE_WCCP
 DOC_START
 	Use this option to define your WCCP ``home'' router for
 	Squid.
 
 	wccp_router supports a single WCCP(v1) router
 
 	wccp2_router supports multiple WCCPv2 routers
 
 	only one of the two may be used at the same time and defines
 	which version of WCCP to use.
 DOC_END
 
 NAME: wccp2_router
 TYPE: IpAddress_list
 LOC: Config.Wccp2.router
 DEFAULT: none
 DEFAULT_DOC: WCCPv2 disabled.
 IFDEF: USE_WCCPv2
 DOC_START
 	Use this option to define your WCCP ``home'' router for

=== modified file 'src/client_db.cc'
--- src/client_db.cc	2017-01-01 00:12:22 +0000
+++ src/client_db.cc	2017-01-22 08:42:37 +0000
@@ -26,94 +26,85 @@
 #include "tools.h"
 
 #if SQUID_SNMP
 #include "snmp_core.h"
 #endif
 
 static hash_table *client_table = NULL;
 
 static ClientInfo *clientdbAdd(const Ip::Address &addr);
 static FREE clientdbFreeItem;
 static void clientdbStartGC(void);
 static void clientdbScheduledGC(void *);
 
 #if USE_DELAY_POOLS
 static int max_clients = 32768;
 #else
 static int max_clients = 32;
 #endif
 
 static int cleanup_running = 0;
 static int cleanup_scheduled = 0;
 static int cleanup_removed;
 
 #if USE_DELAY_POOLS
 #define CLIENT_DB_HASH_SIZE 65357
 #else
 #define CLIENT_DB_HASH_SIZE 467
 #endif
 
 ClientInfo::ClientInfo(const Ip::Address &ip) :
+#if USE_DELAY_POOLS
+    BandwidthBucket(0, 0, 0),
+#endif
     addr(ip),
     n_established(0),
     last_seen(0)
 #if USE_DELAY_POOLS
-    , writeSpeedLimit(0),
-    prevTime(0),
-    bucketSize(0),
-    bucketSizeLimit(0),
-    writeLimitingActive(false),
+    , writeLimitingActive(false),
     firstTimeConnection(true),
     quotaQueue(nullptr),
     rationedQuota(0),
     rationedCount(0),
-    selectWaiting(false),
     eventWaiting(false)
 #endif
 {
     debugs(77, 9, "ClientInfo constructed, this=" << static_cast<void*>(this));
-
-#if USE_DELAY_POOLS
-    getCurrentTime();
-    /* put current time to have something sensible here */
-    prevTime = current_dtime;
-#endif
-
     char *buf = static_cast<char*>(xmalloc(MAX_IPSTRLEN)); // becomes hash.key
-    hash.key = addr.toStr(buf,MAX_IPSTRLEN);
+    key = addr.toStr(buf,MAX_IPSTRLEN);
 }
 
 static ClientInfo *
 clientdbAdd(const Ip::Address &addr)
 {
     ClientInfo *c = new ClientInfo(addr);
-    hash_join(client_table, &c->hash);
+    hash_join(client_table, static_cast<hash_link*>(c));
     ++statCounter.client_http.clients;
 
     if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
         ++cleanup_scheduled;
         eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 90, 0);
     }
 
     return c;
 }
 
 static void
 clientdbInit(void)
 {
     if (client_table)
         return;
 
     client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
 }
 
 class ClientDbRr: public RegisteredRunner
 {
 public:
     /* RegisteredRunner API */
     virtual void useConfig();
 };
 RunnerRegistrationEntry(ClientDbRr);
 
 void
 ClientDbRr::useConfig()
 {
@@ -250,245 +241,237 @@
         NR = 150;
 
     ND = c->Icp.result_hist[LOG_UDP_DENIED] - c->cutoff.n_denied;
 
     p = 100.0 * ND / NR;
 
     if (p < 95.0)
         return 0;
 
     debugs(1, DBG_CRITICAL, "WARNING: Probable misconfigured neighbor at " << key);
 
     debugs(1, DBG_CRITICAL, "WARNING: " << ND << " of the last " << NR <<
            " ICP replies are DENIED");
 
     debugs(1, DBG_CRITICAL, "WARNING: No replies will be sent for the next " <<
            CUTOFF_SECONDS << " seconds");
 
     c->cutoff.time = squid_curtime;
 
     c->cutoff.n_req = c->Icp.n_requests;
 
     c->cutoff.n_denied = c->Icp.result_hist[LOG_UDP_DENIED];
 
     return 1;
 }
 
 void
 clientdbDump(StoreEntry * sentry)
 {
     const char *name;
-    ClientInfo *c;
     int icp_total = 0;
     int icp_hits = 0;
     int http_total = 0;
     int http_hits = 0;
     storeAppendPrintf(sentry, "Cache Clients:\n");
     hash_first(client_table);
 
-    while ((c = (ClientInfo *) hash_next(client_table))) {
-        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash));
+    while (hash_link *hash = hash_next(client_table)) {
+        const ClientInfo *c = reinterpret_cast<const ClientInfo *>(hash);
+        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash));
         if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) {
             storeAppendPrintf(sentry, "Name:    %s\n", name);
         }
         storeAppendPrintf(sentry, "Currently established connections: %d\n",
                           c->n_established);
         storeAppendPrintf(sentry, "    ICP  Requests %d\n",
                           c->Icp.n_requests);
 
         for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
             if (c->Icp.result_hist[l] == 0)
                 continue;
 
             icp_total += c->Icp.result_hist[l];
 
             if (LOG_UDP_HIT == l)
                 icp_hits += c->Icp.result_hist[l];
 
             storeAppendPrintf(sentry, "        %-20.20s %7d %3d%%\n", LogTags(l).c_str(), c->Icp.result_hist[l], Math::intPercent(c->Icp.result_hist[l], c->Icp.n_requests));
         }
 
         storeAppendPrintf(sentry, "    HTTP Requests %d\n", c->Http.n_requests);
 
         for (LogTags_ot l = LOG_TAG_NONE; l < LOG_TYPE_MAX; ++l) {
             if (c->Http.result_hist[l] == 0)
                 continue;
 
             http_total += c->Http.result_hist[l];
 
             if (LogTags(l).isTcpHit())
                 http_hits += c->Http.result_hist[l];
 
             storeAppendPrintf(sentry,
                               "        %-20.20s %7d %3d%%\n",
                               LogTags(l).c_str(),
                               c->Http.result_hist[l],
                               Math::intPercent(c->Http.result_hist[l], c->Http.n_requests));
         }
 
         storeAppendPrintf(sentry, "\n");
     }
 
     storeAppendPrintf(sentry, "TOTALS\n");
     storeAppendPrintf(sentry, "ICP : %d Queries, %d Hits (%3d%%)\n",
                       icp_total, icp_hits, Math::intPercent(icp_hits, icp_total));
     storeAppendPrintf(sentry, "HTTP: %d Requests, %d Hits (%3d%%)\n",
                       http_total, http_hits, Math::intPercent(http_hits, http_total));
 }
 
 static void
 clientdbFreeItem(void *data)
 {
     ClientInfo *c = (ClientInfo *)data;
     delete c;
 }
 
 ClientInfo::~ClientInfo()
 {
-    safe_free(hash.key);
+    safe_free(key);
 
 #if USE_DELAY_POOLS
     if (CommQuotaQueue *q = quotaQueue) {
         q->clientInfo = NULL;
         delete q; // invalidates cbdata, cancelling any pending kicks
     }
 #endif
 
     debugs(77, 9, "ClientInfo destructed, this=" << static_cast<void*>(this));
 }
 
 void
 clientdbFreeMemory(void)
 {
     hashFreeItems(client_table, clientdbFreeItem);
     hashFreeMemory(client_table);
     client_table = NULL;
 }
 
 static void
 clientdbScheduledGC(void *)
 {
     cleanup_scheduled = 0;
     clientdbStartGC();
 }
 
 static void
 clientdbGC(void *)
 {
     static int bucket = 0;
     hash_link *link_next;
 
     link_next = hash_get_bucket(client_table, bucket++);
 
     while (link_next != NULL) {
         ClientInfo *c = (ClientInfo *)link_next;
         int age = squid_curtime - c->last_seen;
         link_next = link_next->next;
 
         if (c->n_established)
             continue;
 
         if (age < 24 * 3600 && c->Http.n_requests > 100)
             continue;
 
         if (age < 4 * 3600 && (c->Http.n_requests > 10 || c->Icp.n_requests > 10))
             continue;
 
         if (age < 5 * 60 && (c->Http.n_requests > 1 || c->Icp.n_requests > 1))
             continue;
 
         if (age < 60)
             continue;
 
-        hash_remove_link(client_table, &c->hash);
+        hash_remove_link(client_table, static_cast<hash_link*>(c));
 
         clientdbFreeItem(c);
 
         --statCounter.client_http.clients;
 
         ++cleanup_removed;
     }
 
     if (bucket < CLIENT_DB_HASH_SIZE)
         eventAdd("client_db garbage collector", clientdbGC, NULL, 0.15, 0);
     else {
         bucket = 0;
         cleanup_running = 0;
         max_clients = statCounter.client_http.clients * 3 / 2;
 
         if (!cleanup_scheduled) {
             cleanup_scheduled = 1;
             eventAdd("client_db garbage collector", clientdbScheduledGC, NULL, 6 * 3600, 0);
         }
 
         debugs(49, 2, "clientdbGC: Removed " << cleanup_removed << " entries");
     }
 }
 
 static void
 clientdbStartGC(void)
 {
     max_clients = statCounter.client_http.clients;
     cleanup_running = 1;
     cleanup_removed = 0;
     clientdbGC(NULL);
 }
 
 #if SQUID_SNMP
 
 Ip::Address *
 client_entry(Ip::Address *current)
 {
-    ClientInfo *c = NULL;
     char key[MAX_IPSTRLEN];
+    hash_first(client_table);
 
     if (current) {
         current->toStr(key,MAX_IPSTRLEN);
-        hash_first(client_table);
-        while ((c = (ClientInfo *) hash_next(client_table))) {
-            if (!strcmp(key, hashKeyStr(&c->hash)))
+        while (hash_link *hash = hash_next(client_table)) {
+            if (!strcmp(key, hashKeyStr(hash)))
                 break;
         }
-
-        c = (ClientInfo *) hash_next(client_table);
-    } else {
-        hash_first(client_table);
-        c = (ClientInfo *) hash_next(client_table);
     }
 
+    ClientInfo *c = reinterpret_cast<ClientInfo *>(hash_next(client_table));
+
     hash_last(client_table);
 
-    if (c)
-        return (&c->addr);
-    else
-        return (NULL);
-
+    return c ? &c->addr : nullptr;
 }
 
 variable_list *
 snmp_meshCtblFn(variable_list * Var, snint * ErrP)
 {
     char key[MAX_IPSTRLEN];
     ClientInfo *c = NULL;
     Ip::Address keyIp;
 
     *ErrP = SNMP_ERR_NOERROR;
     MemBuf tmp;
     debugs(49, 6, HERE << "Current : length=" << Var->name_length << ": " << snmpDebugOid(Var->name, Var->name_length, tmp));
     if (Var->name_length == 16) {
         oid2addr(&(Var->name[12]), keyIp, 4);
     } else if (Var->name_length == 28) {
         oid2addr(&(Var->name[12]), keyIp, 16);
     } else {
         *ErrP = SNMP_ERR_NOSUCHNAME;
         return NULL;
     }
 
     keyIp.toStr(key, sizeof(key));
     debugs(49, 5, HERE << "[" << key << "] requested!");
     c = (ClientInfo *) hash_lookup(client_table, key);
 
     if (c == NULL) {
         debugs(49, 5, HERE << "not found.");
         *ErrP = SNMP_ERR_NOSUCHNAME;
         return NULL;
     }

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2017-01-01 00:12:22 +0000
+++ src/client_side.cc	2017-01-22 08:42:37 +0000
@@ -93,60 +93,61 @@
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "internal.h"
 #include "ipc/FdNotes.h"
 #include "ipc/StartListening.h"
 #include "log/access_log.h"
 #include "MemBuf.h"
 #include "MemObject.h"
 #include "mime_header.h"
 #include "parser/Tokenizer.h"
 #include "profiler/Profiler.h"
 #include "rfc1738.h"
 #include "security/NegotiationHistory.h"
 #include "servers/forward.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #include "StatHist.h"
 #include "Store.h"
 #include "TimeOrTag.h"
 #include "tools.h"
 #include "URL.h"
 
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
+#include "MessageDelayPools.h"
 #endif
 #if USE_OPENSSL
 #include "ssl/bio.h"
 #include "ssl/context_storage.h"
 #include "ssl/gadgets.h"
 #include "ssl/helper.h"
 #include "ssl/ProxyCerts.h"
 #include "ssl/ServerBump.h"
 #include "ssl/support.h"
 #endif
 
 // for tvSubUsec() which should be in SquidTime.h
 #include "util.h"
 
 #include <climits>
 #include <cmath>
 #include <limits>
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
 #endif
 
 /// dials clientListenerConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
     typedef void (*Handler)(AnyP::PortCfgPointer &portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
     ListeningStartedDialer(Handler aHandler, AnyP::PortCfgPointer &aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
         handler(aHandler), portCfg(aPortCfg), portTypeNote(note), sub(aSub) {}
 
@@ -2472,77 +2473,77 @@
             Ident::Start(clientConnection, clientIdentDone, this);
     }
 #endif
 
     clientdbEstablished(clientConnection->remote, 1);
 
     needProxyProtocolHeader_ = port->flags.proxySurrogate;
     if (needProxyProtocolHeader_) {
         if (!proxyProtocolValidateClient()) // will close the connection on failure
             return;
     }
 
 #if USE_DELAY_POOLS
     fd_table[clientConnection->fd].clientInfo = NULL;
 
     if (Config.onoff.client_db) {
         /* it was said several times that client write limiter does not work if client_db is disabled */
 
         ClientDelayPools& pools(Config.ClientDelay.pools);
         ACLFilledChecklist ch(NULL, NULL, NULL);
 
         // TODO: we check early to limit error response bandwith but we
         // should recheck when we can honor delay_pool_uses_indirect
         // TODO: we should also pass the port details for myportname here.
         ch.src_addr = clientConnection->remote;
         ch.my_addr = clientConnection->local;
 
         for (unsigned int pool = 0; pool < pools.size(); ++pool) {
 
             /* pools require explicit 'allow' to assign a client into them */
-            if (pools[pool].access) {
-                ch.changeAcl(pools[pool].access);
+            if (pools[pool]->access) {
+                ch.changeAcl(pools[pool]->access);
                 allow_t answer = ch.fastCheck();
                 if (answer == ACCESS_ALLOWED) {
 
                     /*  request client information from db after we did all checks
                         this will save hash lookup if client failed checks */
                     ClientInfo * cli = clientdbGetInfo(clientConnection->remote);
                     assert(cli);
 
                     /* put client info in FDE */
                     fd_table[clientConnection->fd].clientInfo = cli;
 
                     /* setup write limiter for this request */
                     const double burst = floor(0.5 +
-                                               (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
-                    cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+                                               (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0);
+                    cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark);
                     break;
                 } else {
                     debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer);
                 }
             }
         }
     }
 #endif
 
     // kids must extend to actually start doing something (e.g., reading)
 }
 
 /** Handle a new connection on an HTTP socket. */
 void
 httpAccept(const CommAcceptCbParams &params)
 {
     MasterXaction::Pointer xact = params.xaction;
     AnyP::PortCfgPointer s = xact->squidPort;
 
     // NP: it is possible the port was reconfigured when the call or accept() was queued.
 
     if (params.flag != Comm::OK) {
         // Its possible the call was still queued when the client disconnected
         debugs(33, 2, s->listenConn << ": accept failure: " << xstrerr(params.xerrno));
         return;
     }
 
     debugs(33, 4, params.conn << ": accepted");
     fd_note(params.conn->fd, "client http connect");
 

=== modified file 'src/client_side.h'
--- src/client_side.h	2017-01-01 00:12:22 +0000
+++ src/client_side.h	2017-01-22 08:42:37 +0000
@@ -1,59 +1,62 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 33    Client-side Routines */
 
 #ifndef SQUID_CLIENTSIDE_H
 #define SQUID_CLIENTSIDE_H
 
 #include "base/RunnersRegistry.h"
 #include "clientStreamForward.h"
 #include "comm.h"
 #include "helper/forward.h"
 #include "http/forward.h"
 #include "HttpControlMsg.h"
 #include "ipc/FdNotes.h"
 #include "sbuf/SBuf.h"
 #include "servers/Server.h"
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_OPENSSL
 #include "security/Handshake.h"
 #include "ssl/support.h"
 #endif
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class ClientHttpRequest;
 class HttpHdrRangeSpec;
 
 #if USE_OPENSSL
 namespace Ssl
 {
 class ServerBump;
 }
 #endif
 
 /**
  * Legacy Server code managing a connection to a client.
  *
  * NP: presents AsyncJob API but does not operate autonomously as a Job.
  *     So Must() is not safe to use.
  *
  * Multiple requests (up to pipeline_prefetch) can be pipelined.
  * This object is responsible for managing which one is currently being
  * fulfilled and what happens to the queue if the current one causes the client
  * connection to be closed early.
  *
  * Act as a manager for the client connection and passes data in buffer to a
  * Parser relevant to the state (message headers vs body) that is being
  * processed.
  *
  * Performs HTTP message processing to kick off the actual HTTP request
  * handling objects (Http::Stream, ClientHttpRequest, HttpRequest).
  *
  * Performs SSL-Bump processing for switching between HTTP and HTTPS protocols.

=== modified file 'src/comm.cc'
--- src/comm.cc	2017-01-01 00:12:22 +0000
+++ src/comm.cc	2017-01-22 08:42:37 +0000
@@ -889,67 +889,63 @@
     PROF_start(comm_close);
 
     F->flags.close_request = true;
 
 #if USE_OPENSSL
     if (F->ssl) {
         AsyncCall::Pointer startCall=commCbCall(5,4, "commStartSslClose",
                                                 FdeCbPtrFun(commStartSslClose, NULL));
         FdeCbParams &startParams = GetCommParams<FdeCbParams>(startCall);
         startParams.fd = fd;
         ScheduleCallHere(startCall);
     }
 #endif
 
     // a half-closed fd may lack a reader, so we stop monitoring explicitly
     if (commHasHalfClosedMonitor(fd))
         commStopHalfClosedMonitor(fd);
     commUnsetFdTimeout(fd);
 
     // notify read/write handlers after canceling select reservations, if any
     if (COMMIO_FD_WRITECB(fd)->active()) {
         Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
         COMMIO_FD_WRITECB(fd)->finish(Comm::ERR_CLOSING, errno);
     }
     if (COMMIO_FD_READCB(fd)->active()) {
         Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
         COMMIO_FD_READCB(fd)->finish(Comm::ERR_CLOSING, errno);
     }
 
 #if USE_DELAY_POOLS
-    if (ClientInfo *clientInfo = F->clientInfo) {
-        if (clientInfo->selectWaiting) {
-            clientInfo->selectWaiting = false;
-            // kick queue or it will get stuck as commWriteHandle is not called
-            clientInfo->kickQuotaQueue();
-        }
-    }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F))
+        if (bucket->selectWaiting)
+            bucket->onFdClosed();
 #endif
 
     commCallCloseHandlers(fd);
 
     comm_empty_os_read_buffers(fd);
 
     AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
                                     FdeCbPtrFun(comm_close_complete, NULL));
     FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall);
     completeParams.fd = fd;
     // must use async call to wait for all callbacks
     // scheduled before comm_close() to finish
     ScheduleCallHere(completeCall);
 
     PROF_stop(comm_close);
 }
 
 /* Send a udp datagram to specified TO_ADDR. */
 int
 comm_udp_sendto(int fd,
                 const Ip::Address &to_addr,
                 const void *buf,
                 int len)
 {
     PROF_start(comm_udp_sendto);
     ++ statCounter.syscalls.sock.sendtos;
 
     debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
            " using FD " << fd << " using Port " << comm_local_port(fd) );
 
@@ -1333,186 +1329,188 @@
 /// queues a given fd, creating the queue if necessary; returns reservation ID
 unsigned int
 ClientInfo::quotaEnqueue(int fd)
 {
     assert(quotaQueue);
     return quotaQueue->enqueue(fd);
 }
 
 /// removes queue head
 void
 ClientInfo::quotaDequeue()
 {
     assert(quotaQueue);
     quotaQueue->dequeue();
 }
 
 void
 ClientInfo::kickQuotaQueue()
 {
     if (!eventWaiting && !selectWaiting && hasQueue()) {
         // wait at least a second if the bucket is empty
         const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
         eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
                  quotaQueue, delay, 0, true);
         eventWaiting = true;
     }
 }
 
 /// calculates how much to write for a single dequeued client
 int
-ClientInfo::quotaForDequed()
+ClientInfo::quota()
 {
     /* If we have multiple clients and give full bucketSize to each client then
      * clt1 may often get a lot more because clt1->clt2 time distance in the
      * select(2) callback order may be a lot smaller than cltN->clt1 distance.
      * We divide quota evenly to be more fair. */
 
     if (!rationedCount) {
         rationedCount = quotaQueue->size() + 1;
 
         // The delay in ration recalculation _temporary_ deprives clients from
         // bytes that should have trickled in while rationedCount was positive.
         refillBucket();
 
         // Rounding errors do not accumulate here, but we round down to avoid
         // negative bucket sizes after write with rationedCount=1.
         rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
         debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
                '*' << rationedCount);
     }
 
     --rationedCount;
     debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
            " rations remaining: " << rationedCount);
 
     // update 'last seen' time to prevent clientdb GC from dropping us
     last_seen = squid_curtime;
     return rationedQuota;
 }
 
-///< adds bytes to the quota bucket based on the rate and passed time
-void
-ClientInfo::refillBucket()
-{
-    // all these times are in seconds, with double precision
-    const double currTime = current_dtime;
-    const double timePassed = currTime - prevTime;
-
-    // Calculate allowance for the time passed. Use double to avoid
-    // accumulating rounding errors for small intervals. For example, always
-    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
-    const double gain = timePassed * writeSpeedLimit;
-
-    debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
-           bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
-           " = " << gain << ')');
-
-    // to further combat error accumulation during micro updates,
-    // quit before updating time if we cannot add at least one byte
-    if (gain < 1.0)
-        return;
-
-    prevTime = currTime;
-
-    // for "first" connections, drain initial fat before refilling but keep
-    // updating prevTime to avoid bursts after the fat is gone
-    if (bucketSize > bucketSizeLimit) {
-        debugs(77,4, HERE << "not refilling while draining initial fat");
-        return;
-    }
-
-    bucketSize += gain;
-
-    // obey quota limits
-    if (bucketSize > bucketSizeLimit)
-        bucketSize = bucketSizeLimit;
+bool
+ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    assert(hasQueue());
+    assert(quotaPeekFd() == state->conn->fd);
+    quotaDequeue(); // we will write or requeue below
+    if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
+        return false;
+    }
+    return true;
+}
+
+void
+ClientInfo::scheduleWrite(Comm::IoCallback *state)
+{
+    if (writeLimitingActive) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
+    }
+}
+
+void
+ClientInfo::onFdClosed()
+{
+    BandwidthBucket::onFdClosed();
+    // kick queue or it will get stuck as commWriteHandle is not called
+    kickQuotaQueue();
+}
+
+void
+ClientInfo::reduceBucket(const int len)
+{
+    if (len > 0)
+        BandwidthBucket::reduceBucket(len);
+    // even if we wrote nothing, we were served; give others a chance
+    kickQuotaQueue();
 }
 
 void
 ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
 {
-    debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+    debugs(77,5, "Write limits for " << (const char*)key <<
            " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
            " highwatermark=" << aHighWatermark);
 
     // set or possibly update traffic shaping parameters
     writeLimitingActive = true;
     writeSpeedLimit = aWriteSpeedLimit;
     bucketSizeLimit = aHighWatermark;
 
     // but some members should only be set once for a newly activated bucket
     if (firstTimeConnection) {
         firstTimeConnection = false;
 
         assert(!selectWaiting);
         assert(!quotaQueue);
         quotaQueue = new CommQuotaQueue(this);
 
         bucketSize = anInitialBurst;
         prevTime = current_dtime;
     }
 }
 
 CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
     ins(0), outs(0)
 {
     assert(clientInfo);
 }
 
 CommQuotaQueue::~CommQuotaQueue()
 {
     assert(!clientInfo); // ClientInfo should clear this before destroying us
 }
 
 /// places the given fd at the end of the queue; returns reservation ID
 unsigned int
 CommQuotaQueue::enqueue(int fd)
 {
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
     fds.push_back(fd);
     return ++ins;
 }
 
 /// removes queue head
 void
 CommQuotaQueue::dequeue()
 {
     assert(!fds.empty());
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
            fds.size());
     fds.pop_front();
     ++outs;
 }
-#endif
+#endif /* USE_DELAY_POOLS */
 
 /*
  * hm, this might be too general-purpose for all the places we'd
  * like to use it.
  */
 int
 ignoreErrno(int ierrno)
 {
     switch (ierrno) {
 
     case EINPROGRESS:
 
     case EWOULDBLOCK:
 #if EAGAIN != EWOULDBLOCK
 
     case EAGAIN:
 #endif
 
     case EALREADY:
 
     case EINTR:
 #ifdef ERESTART
 
     case ERESTART:
 #endif
 
         return 1;
 
     default:
         return 0;
@@ -1566,61 +1564,70 @@
     return false;
 }
 
 static bool
 writeTimedOut(int fd)
 {
     if (!COMMIO_FD_WRITECB(fd)->active())
         return false;
 
     if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
         return false;
 
     return true;
 }
 
 void
 checkTimeouts(void)
 {
     int fd;
     fde *F = NULL;
     AsyncCall::Pointer callback;
 
     for (fd = 0; fd <= Biggest_FD; ++fd) {
         F = &fd_table[fd];
 
         if (writeTimedOut(fd)) {
             // We have an active write callback and we are timed out
             debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
             Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
             COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT);
-        } else if (AlreadyTimedOut(F))
+#if USE_DELAY_POOLS
+        } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) {
+            if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) {
+                F->writeQuotaHandler->selectWaiting = true;
+                Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0);
+            }
+            continue;
+#endif
+        }
+        else if (AlreadyTimedOut(F))
             continue;
 
         debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
 
         if (F->timeoutHandler != NULL) {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
             callback = F->timeoutHandler;
             F->timeoutHandler = NULL;
             ScheduleCallHere(callback);
         } else {
             debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
             comm_close(fd);
         }
     }
 }
 
 /// Start waiting for a possibly half-closed connection to close
 // by scheduling a read callback to a monitoring handler that
 // will close the connection on read errors.
 void
 commStartHalfClosedMonitor(int fd)
 {
     debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed);
     assert(isOpen(fd) && !commHasHalfClosedMonitor(fd));
     (void)TheHalfClosed->add(fd); // could also assert the result
     commPlanHalfClosedCheck(); // may schedule check if we added the first FD
 }
 
 static
 void

=== modified file 'src/comm/IoCallback.cc'
--- src/comm/IoCallback.cc	2017-01-01 00:12:22 +0000
+++ src/comm/IoCallback.cc	2017-01-22 08:42:37 +0000
@@ -42,67 +42,63 @@
 }
 
 /**
  * Configure Comm::Callback for I/O
  *
  * @param fd            filedescriptor
  * @param t             IO callback type (read or write)
  * @param cb            callback
  * @param buf           buffer, if applicable
  * @param func          freefunc, if applicable
  * @param sz            buffer size
  */
 void
 Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
 {
     assert(!active());
     assert(type == t);
     assert(cb != NULL);
 
     callback = cb;
     buf = b;
     freefunc = f;
     size = sz;
     offset = 0;
 }
 
 void
 Comm::IoCallback::selectOrQueueWrite()
 {
 #if USE_DELAY_POOLS
-    // stand in line if there is one
-    if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
-        if (clientInfo->writeLimitingActive) {
-            quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
-            clientInfo->kickQuotaQueue();
-            return;
-        }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) {
+        bucket->scheduleWrite(this);
+        return;
     }
 #endif
 
     SetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
 }
 
 void
 Comm::IoCallback::cancel(const char *reason)
 {
     if (!active())
         return;
 
     callback->cancel(reason);
     callback = NULL;
     reset();
 }
 
 void
 Comm::IoCallback::reset()
 {
     conn = NULL;
     if (freefunc) {
         freefunc(buf);
         buf = NULL;
         freefunc = NULL;
     }
     xerrno = 0;
 
 #if USE_DELAY_POOLS
     quotaQueueReserv = 0;

=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc	2017-01-01 00:12:22 +0000
+++ src/comm/Write.cc	2017-01-22 08:42:37 +0000
@@ -1,152 +1,124 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
+#include "cbdata.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
+#include "comm/Loops.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "MemBuf.h"
 #include "profiler/Profiler.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 
 #include <cerrno>
 
 void
 Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
 {
     Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
 void
 Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(fd_table[conn->fd].flags.open);
     assert(!fd_table[conn->fd].closing());
     Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
     assert(!ccb->active());
 
     fd_table[conn->fd].writeStart = squid_curtime;
     ccb->conn = conn;
     /* Queue the write */
     ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
     ccb->selectOrQueueWrite();
 }
 
 /** Write to FD.
  * This function is used by the lowest level of IO loop which only has access to FD numbers.
  * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
  * Once the write has been concluded we schedule the waiting call with success/fail results.
  */
 void
 Comm::HandleWrite(int fd, void *data)
 {
     Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
     int len = 0;
     int nleft;
 
-    assert(state->conn != NULL && state->conn->fd == fd);
+    assert(state->conn != NULL);
+    assert(state->conn->fd == fd);
 
     PROF_start(commHandleWrite);
     debugs(5, 5, HERE << state->conn << ": off " <<
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
 
 #if USE_DELAY_POOLS
-    ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
-    if (clientInfo && !clientInfo->writeLimitingActive)
-        clientInfo = NULL; // we only care about quota limits here
-
-    if (clientInfo) {
-        assert(clientInfo->selectWaiting);
-        clientInfo->selectWaiting = false;
-
-        assert(clientInfo->hasQueue());
-        assert(clientInfo->quotaPeekFd() == fd);
-        clientInfo->quotaDequeue(); // we will write or requeue below
-
-        if (nleft > 0) {
-            const int quota = clientInfo->quotaForDequed();
-            if (!quota) {  // if no write quota left, queue this fd
-                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-                clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
-            }
-
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << state->conn << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
-            }
-
+    BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]);
+    if (bucket) {
+        assert(bucket->selectWaiting);
+        bucket->selectWaiting = false;
+        if (nleft > 0 && !bucket->applyQuota(nleft, state)) {
+            PROF_stop(commHandleWrite);
+            return;
         }
     }
 #endif /* USE_DELAY_POOLS */
 
     /* actually WRITE data */
     int xerrno = errno = 0;
     len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     xerrno = errno;
     debugs(5, 5, HERE << "write() returns " << len);
 
 #if USE_DELAY_POOLS
-    if (clientInfo) {
-        if (len > 0) {
-            /* we wrote data - drain them from bucket */
-            clientInfo->bucketSize -= len;
-            if (clientInfo->bucketSize < 0.0) {
-                debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
-                clientInfo->bucketSize = 0;
-            }
-        }
-
-        // even if we wrote nothing, we were served; give others a chance
-        clientInfo->kickQuotaQueue();
+    if (bucket) {
+        /* we wrote data - drain them from bucket */
+        bucket->reduceBucket(len);
     }
 #endif /* USE_DELAY_POOLS */
 
     fd_bytes(fd, len, FD_WRITE);
     ++statCounter.syscalls.sock.writes;
     // After each successful partial write,
     // reset fde::writeStart to the current time.
     fd_table[fd].writeStart = squid_curtime;
 
     if (len == 0) {
         /* Note we even call write if nleft == 0 */
         /* We're done */
         if (nleft != 0)
             debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
 
         state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, 0);
     } else if (len < 0) {
         /* An error */
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
         } else if (ignoreErrno(xerrno)) {
             debugs(50, 9, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->selectOrQueueWrite();
         } else {
             debugs(50, 2, "FD " << fd << " write failure: " << xstrerr(xerrno) << ".");
             state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, xerrno);
         }
     } else {
         /* A successful write, continue */

=== modified file 'src/fde.h'
--- src/fde.h	2017-01-01 00:12:22 +0000
+++ src/fde.h	2017-01-22 08:42:37 +0000
@@ -1,49 +1,50 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_FDE_H
 #define SQUID_FDE_H
 
 #include "comm.h"
 #include "defines.h"
 #include "ip/Address.h"
 #include "ip/forward.h"
 #include "security/forward.h"
 #include "typedefs.h" //DRCB, DWCB
 
 #if USE_DELAY_POOLS
+#include "MessageBucket.h"
 class ClientInfo;
 #endif
 
 /**
  * READ_HANDLER functions return < 0 if, and only if, they fail with an error.
  * On error, they must pass back an error code in 'errno'.
  */
 typedef int READ_HANDLER(int, char *, int);
 
 /**
  * WRITE_HANDLER functions return < 0 if, and only if, they fail with an error.
  * On error, they must pass back an error code in 'errno'.
  */
 typedef int WRITE_HANDLER(int, const char *, int);
 
 class dwrite_q;
 class _fde_disk
 {
 public:
     DWCB *wrt_handle;
     void *wrt_handle_data;
     dwrite_q *write_q;
     dwrite_q *write_q_tail;
     off_t offset;
     _fde_disk() { memset(this, 0, sizeof(_fde_disk)); }
 };
 
 class fde
 {
 
@@ -76,110 +77,112 @@
                                         See also nfmarkFromServer. */
     int sock_family;
     char ipaddr[MAX_IPSTRLEN];            /* dotted decimal address of peer */
     char desc[FD_DESC_SZ];
 
     struct _fde_flags {
         bool open;
         bool close_request; ///< true if file_ or comm_close has been called
         bool write_daemon;
         bool socket_eof;
         bool nolinger;
         bool nonblocking;
         bool ipc;
         bool called_connect;
         bool nodelay;
         bool close_on_exec;
         bool read_pending;
         //bool write_pending; //XXX seems not to be used
         bool transparent;
     } flags;
 
     int64_t bytes_read;
     int64_t bytes_written;
 
     struct {
         int uses;                   /* ie # req's over persistent conn */
     } pconn;
 
 #if USE_DELAY_POOLS
     ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
 #endif
     unsigned epoll_state;
 
     _fde_disk disk;
     PF *read_handler;
     void *read_data;
     PF *write_handler;
     void *write_data;
     AsyncCall::Pointer timeoutHandler;
     time_t timeout;
     time_t writeStart;
     void *lifetime_data;
     AsyncCall::Pointer closeHandler;
     AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds
     READ_HANDLER *read_method;
     WRITE_HANDLER *write_method;
     Security::SessionPointer ssl;
     Security::ContextPointer dynamicTlsContext; ///< cached and then freed when fd is closed
 #if _SQUID_WINDOWS_
     struct {
         long handle;
     } win32;
 #endif
     tos_t tosFromServer;                /**< Stores the TOS flags of the packets from the remote server.
                                             See FwdState::dispatch(). Note that this differs to
                                             tosToServer in that this is the value we *receive* from the,
                                             connection, whereas tosToServer is the value to set on packets
                                             *leaving* Squid.  */
     unsigned int nfmarkFromServer;      /**< Stores the Netfilter mark value of the connection from the remote
                                             server. See FwdState::dispatch(). Note that this differs to
                                             nfmarkToServer in that this is the value we *receive* from the,
                                             connection, whereas nfmarkToServer is the value to set on packets
                                             *leaving* Squid.   */
 
     /** Clear the fde class back to NULL equivalent. */
     inline void clear() {
         type = 0;
         remote_port = 0;
         local_addr.setEmpty();
         tosToServer = '\0';
         nfmarkToServer = 0;
         sock_family = 0;
         memset(ipaddr, '\0', MAX_IPSTRLEN);
         memset(desc,'\0',FD_DESC_SZ);
         memset(&flags,0,sizeof(_fde_flags));
         bytes_read = 0;
         bytes_written = 0;
         pconn.uses = 0;
 #if USE_DELAY_POOLS
         clientInfo = NULL;
+        writeQuotaHandler = NULL;
 #endif
         epoll_state = 0;
         read_handler = NULL;
         read_data = NULL;
         write_handler = NULL;
         write_data = NULL;
         timeoutHandler = NULL;
         timeout = 0;
         writeStart = 0;
         lifetime_data = NULL;
         closeHandler = NULL;
         halfClosedReader = NULL;
         read_method = NULL;
         write_method = NULL;
         ssl.reset();
         dynamicTlsContext.reset();
 #if _SQUID_WINDOWS_
         win32.handle = (long)NULL;
 #endif
         tosFromServer = '\0';
         nfmarkFromServer = 0;
     }
 };
 
 #define fd_table fde::Table
 
 int fdNFree(void);
 
 #define FD_READ_METHOD(fd, buf, len) (*fd_table[fd].read_method)(fd, buf, len)
 #define FD_WRITE_METHOD(fd, buf, len) (*fd_table[fd].write_method)(fd, buf, len)

=== modified file 'src/http/Stream.cc'
--- src/http/Stream.cc	2017-01-01 00:12:22 +0000
+++ src/http/Stream.cc	2017-01-22 08:42:37 +0000
@@ -1,45 +1,51 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "client_side_request.h"
 #include "http/Stream.h"
 #include "HttpHdrContRange.h"
 #include "HttpHeaderTools.h"
 #include "Store.h"
 #include "TimeOrTag.h"
+#if USE_DELAY_POOLS
+#include "acl/FilledChecklist.h"
+#include "ClientInfo.h"
+#include "fde.h"
+#include "MessageDelayPools.h"
+#endif
 
 Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
     clientConnection(aConn),
     http(aReq),
     reply(nullptr),
     writtenToSocket(0),
     mayUseConnection_(false),
     connRegistered_(false)
 {
     assert(http != nullptr);
     memset(reqbuf, '\0', sizeof (reqbuf));
     flags.deferred = 0;
     flags.parsed_ok = 0;
     deferredparams.node = nullptr;
     deferredparams.rep = nullptr;
 }
 
 Http::Stream::~Stream()
 {
     if (auto node = getTail()) {
         if (auto ctx = dynamic_cast<Http::Stream *>(node->data.getRaw())) {
             /* We are *always* the tail - prevent recursive free */
             assert(this == ctx);
             node->data = nullptr;
         }
     }
     httpRequestFree(http);
 }
 
 void
@@ -256,60 +262,78 @@
 }
 
 void
 Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
 {
     prepareReply(rep);
     assert(rep);
     MemBuf *mb = rep->pack();
 
     // dump now, so we dont output any body.
     debugs(11, 2, "HTTP Client " << clientConnection);
     debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
 
     /* Save length of headers for persistent conn checks */
     http->out.headers_sz = mb->contentSize();
 #if HEADERS_LOG
     headersLog(0, 0, http->request->method, rep);
 #endif
 
     if (bodyData.data && bodyData.length) {
         if (multipartRangeRequest())
             packRange(bodyData, mb);
         else if (http->request->flags.chunkedReply) {
             packChunk(bodyData, *mb);
         } else {
             size_t length = lengthToSend(bodyData.range());
             noteSentBodyBytes(length);
             mb->append(bodyData.data, length);
         }
     }
+#if USE_DELAY_POOLS
+    for (const auto &pool: MessageDelayPools::Instance()->pools) {
+        if (pool->access) {
+            std::unique_ptr<ACLFilledChecklist> chl(clientAclChecklistCreate(pool->access, http));
+            chl->reply = rep;
+            HTTPMSGLOCK(chl->reply);
+            const allow_t answer = chl->fastCheck();
+            if (answer == ACCESS_ALLOWED) {
+                writeQuotaHandler = pool->createBucket();
+                fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
+                break;
+            } else {
+                debugs(83, 4, "Response delay pool " << pool->poolName <<
+                       " skipped because ACL " << answer);
+            }
+        }
+    }
+#endif
 
     getConn()->write(mb);
     delete mb;
 }
 
 void
 Http::Stream::sendBody(StoreIOBuffer bodyData)
 {
     if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
         size_t length = lengthToSend(bodyData.range());
         noteSentBodyBytes(length);
         getConn()->write(bodyData.data, length);
         return;
     }
 
     MemBuf mb;
     mb.init();
     if (multipartRangeRequest())
         packRange(bodyData, &mb);
     else
         packChunk(bodyData, mb);
 
     if (mb.contentSize())
         getConn()->write(&mb);
     else
         writeComplete(0);
 }
 
 size_t
 Http::Stream::lengthToSend(Range<int64_t> const &available) const

=== modified file 'src/http/Stream.h'
--- src/http/Stream.h	2017-01-01 00:12:22 +0000
+++ src/http/Stream.h	2017-01-22 08:42:37 +0000
@@ -1,44 +1,47 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_SRC_HTTP_STREAM_H
 #define SQUID_SRC_HTTP_STREAM_H
 
 #include "http/forward.h"
 #include "mem/forward.h"
 #include "StoreIOBuffer.h"
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class clientStreamNode;
 class ClientHttpRequest;
 
 namespace Http
 {
 
 /**
  * The processing context for a single HTTP transaction (stream).
  *
  * A stream lifetime extends from directly after a request has been parsed
  * off the client connection buffer, until the last byte of both request
  * and reply payload (if any) have been written, or it is otherwise
  * explicitly terminated.
  *
  * Streams self-register with the Http::Server Pipeline being managed by the
  * Server for the connection on which the request was received.
  *
  * The socket level management and I/O is done by a Server which owns us.
  * The scope of this objects control over a socket consists of the data
  * buffer received from the Server with an initially unknown length.
  * When that length is known it sets the end boundary of our access to the
  * buffer.
  *
  * The individual processing actions are done by other Jobs which we start.
  *
  * When a stream is completed the finished() method needs to be called which
  * will perform all cleanup and deregistration operations. If the reason for
  * finishing is an error, then notifyIoError() needs to be called prior to
  * the finished() method.
@@ -134,36 +137,39 @@
         unsigned parsed_ok:1; ///< Was this parsed correctly?
     } flags;
 
     bool mayUseConnection() const {return mayUseConnection_;}
 
     void mayUseConnection(bool aBool) {
         mayUseConnection_ = aBool;
         debugs(33, 3, "This " << this << " marked " << aBool);
     }
 
     class DeferredParams
     {
 
     public:
         clientStreamNode *node;
         HttpReply *rep;
         StoreIOBuffer queuedBuffer;
     };
 
     DeferredParams deferredparams;
     int64_t writtenToSocket;
 
 private:
     void prepareReply(HttpReply *);
     void packChunk(const StoreIOBuffer &bodyData, MemBuf &);
     void packRange(StoreIOBuffer const &, MemBuf *);
     void doClose();
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
+#if USE_DELAY_POOLS
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
+#endif
 };
 
 } // namespace Http
 
 #endif /* SQUID_SRC_HTTP_STREAM_H */
 

=== modified file 'src/tests/Stub.list'
--- src/tests/Stub.list	2017-01-08 02:18:32 +0000
+++ src/tests/Stub.list	2017-01-22 08:42:37 +0000
@@ -1,47 +1,48 @@
 ##
 ## Copyright (C) 1996-2017 The Squid Software Foundation and contributors
 ##
 ## Squid software is distributed under GPLv2+ license and includes
 ## contributions from numerous individuals and organizations.
 ## Please see the COPYING and CONTRIBUTORS files for details.
 ##
 
 STUB_SOURCE= tests/STUB.h \
 	tests/stub_access_log.cc \
 	tests/stub_acl.cc \
 	tests/stub_cache_cf.cc \
 	tests/stub_CacheDigest.cc \
 	tests/stub_cache_manager.cc \
 	tests/stub_carp.cc \
 	tests/stub_cbdata.cc \
 	tests/stub_client_db.cc \
+	tests/stub_ClientDelayConfig.cc \
 	tests/stub_client_side.cc \
 	tests/stub_client_side_request.cc \
 	tests/stub_CollapsedForwarding.cc \
 	tests/stub_comm.cc \
 	tests/stub_CommIO.cc \
 	tests/stub_debug.cc \
 	tests/stub_DelayId.cc \
 	tests/stub_errorpage.cc \
 	tests/stub_ETag.cc \
 	tests/stub_event.cc \
 	tests/stub_EventLoop.cc \
 	tests/stub_external_acl.cc \
 	tests/stub_fatal.cc \
 	tests/stub_fd.cc \
 	tests/stub_helper.cc \
 	tests/stub_HelperChildConfig.cc \
 	tests/stub_http.cc \
 	tests/stub_HttpControlMsg.cc \
 	tests/stub_HttpReply.cc \
 	tests/stub_HttpRequest.cc \
 	tests/stub_icp.cc \
 	tests/stub_internal.cc \
 	tests/stub_ipcache.cc \
 	tests/stub_ipc.cc \
 	tests/stub_ipc_Forwarder.cc \
 	tests/stub_ipc_TypedMsgHdr.cc \
 	tests/stub_libauth_acls.cc \
 	tests/stub_libauth.cc \
 	tests/stub_libcomm.cc \
 	tests/stub_libdiskio.cc \

=== added file 'src/tests/stub_ClientDelayConfig.cc'
--- src/tests/stub_ClientDelayConfig.cc	1970-01-01 00:00:00 +0000
+++ src/tests/stub_ClientDelayConfig.cc	2016-10-26 14:10:51 +0000
@@ -0,0 +1,19 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "ClientDelayConfig.h"
+#define STUB_API "ClientDelayConfig.cc"
+#include "tests/STUB.h"
+
+ClientDelayConfig::~ClientDelayConfig() STUB_NOP
+
+#endif
+

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to