To clarify what we are up to:

Pipelining is a queue of requests which are pre-parsed (and sometimes even have a response queued) while some request received earlier is being fetched. The pipeline queue has several states: disabled, empty, blocked, terminated, and full. Empty is pretty self-explanatory state. Full is at some arbitrary point as determined by pipeline_prefetch. Blocked is a temporary state for preventing further pipelining until all currently queued requests are completed. Terminated signals an irreversable halt to pipelining on the connection. Blocked and Terminated are precautionary measures to prevent wasted resources and unrolling a pipeline of request contexts.

Some of these things are already existing either explicitly or implicitly. This patch:

* changes pipeline_prefetch directive from an on/off toggle with hard-coded pipeline queue length of 1 request (+1 being handled) to a numeric queue-length limiter for determining arbitrary values of where "full" state is preventing new request parsing.

* adds pipeline state flags to ConnStateData and accessors for the parser to signal blocked or terminated status as identified.

* adds calls to relevant parts of the parser and pre-processing functions to set pipeline blocked or terminated states.

* changes the OkTOAddRequest() test function to check all the states and prevent further pipelining if there is a) any reason not to pre-parse further, or b) no room in the queue.

* disabling pipelining whenever client_persistent_connections is OFF.


On 22/05/2013 5:00 a.m., Alex Rousskov wrote:
On 05/20/2013 10:59 PM, Amos Jeffries wrote:
<snip>
I've added a loop to scan for Connection:close (and
Proxy-Connection:close), and CONNECT method as blockers on further
pipieline reads. There may be other conditions we find later.
The new loop is problematic, on several levels:

1) We should either

   * assume (or ensure) that when we are called a connection is persistent

or

   * not assume that when getCurrentContext() is NULL the connection is
persistent.

I recommend the first approach because it is simpler, does not add new
state, and avoids code duplication. This is the approach used before
your changes AFAICT.

We are HTTP/1.1 now. We can and do assume the connection is persistent unless some signal is available that explicitly means it is closed.
Before and after the loop was added AFAICT.

  However, if you disagree, then the
"getCurrentContext() is NULL implies that the connection is persistent"
assumption above should be removed and the next two items will also apply:


2) We should not implement "connection is persistent after this request"
check inside the loop. It should be either a method of
ClientSocketContext() or a ConnStateData member, depending on whether
you remove the assumption discussed above.

There is a ! operator on that test you may have overlooked. This is the check for "Connection:close is present --> no more prefetching.". In all other cases we are assuming persistence. That may or may not be right but note that this function is only called when there is *already * a series of bytes in the read buffer and we are simply deciding whether to parse it now (assuming it is a new request header) or wait to find out if it actually is a header block.

NP: in the attached patch after the loop removal the same persistence check is run right after HttpHeaders are all parsed. Then updates the ConnStateData pipeline state to disable any future requests if non-persistence is seen.

On the whole I am thinking there is a lot more things we should be blocking on which are not tested for yet and real-world traffic exposure will require further additions. While moving the block/disable/no-change checks into the clientProcessRequest() function I have added more tests, this time for payload existence to temporarily block the pipeline as well, just like CONNECT, and all sorts of "framing" errors in the early parser now disable pipelining entirely to avoid potential request smuggling situations.

Yes I agree that some of these events are possibly not necessary to disable or block. However, pipelining is itself a traffic optimization which is largely unused today anyway. So by allowing >0 queued requests for any portion of a connections lifetime we are already gaining. Disabling or pausing it at the first hint of trouble seems to be the right (conservative) way to go until we have more experience and testing to prove that enabling it is fine.



+    // XXX: check that pipeline queue length is no more than M seconds long 
already.
+    // For long-delay queues we need to push back at the client via leaving 
things in TCP buffers.
+    // By time M all queued objects are already at risk of getting 
client-timeout errors.
I do not think we should do this, at least not by default, so this is
not an XXX. I would remove that comment, but if you want to keep it,
please rephrase it as an optional feature. It would be rather difficult
for Squid to measure the time those requests spent in TCP buffers and
also predict whether the client still wants those requests to go ahead.
Some clients would not submit all requests at once and will not timeout
submitted requests as long as they keep receiving response(s), for example.

FYI: Using time+count for queue length instead of only count is one of the main design models that have come out of the buffer bloat project as a major source of efficient high performance in queue management.

As to implementation ... each queued request should be having a timestamp in its state data somewhere for time of first read and time of headers completion read. I am thinking we can compare one of those against current_timestamp and know for the last queued request and know how long it has been waiting. That timestamp is just missing or unknown to me right now, and is an optimization anyway so its not implemented yet.

I have altered the XXX to a TODO.

Amos
=== modified file 'doc/release-notes/release-3.4.sgml'
--- doc/release-notes/release-3.4.sgml  2013-05-14 17:54:30 +0000
+++ doc/release-notes/release-3.4.sgml  2013-05-16 03:38:06 +0000
@@ -179,6 +179,9 @@
        <p>New format code <em>%note</em> to log a transaction annotation 
linked to the
           transaction by ICAP, eCAP, a helper, or the <em>note</em> squid.conf 
directive.
 
+       <tag>pipeline_prefetch</tag>
+       <p>Updated to take a numeric count of prefetched pipeline requests 
instead of ON/OFF.
+
        <tag>unlinkd_program</tag>
        <p>New helper response format utilizing result codes <em>OK</em> and 
<em>BH</em>,
           to signal helper lookup results. Also, key-value response values to 
return

=== modified file 'src/Parsing.cc'
--- src/Parsing.cc      2013-05-04 13:14:23 +0000
+++ src/Parsing.cc      2013-05-16 13:55:49 +0000
@@ -33,6 +33,7 @@
 #include "squid.h"
 #include "cache_cf.h"
 #include "compat/strtoll.h"
+#include "ConfigParser.h"
 #include "Parsing.h"
 #include "globals.h"
 #include "Debug.h"
@@ -161,7 +162,7 @@
 int
 GetInteger(void)
 {
-    char *token = strtok(NULL, w_space);
+    char *token = ConfigParser::strtokFile();
     int i;
 
     if (token == NULL)

=== modified file 'src/SquidConfig.h'
--- src/SquidConfig.h   2013-05-13 03:57:03 +0000
+++ src/SquidConfig.h   2013-05-16 03:21:38 +0000
@@ -332,7 +332,6 @@
 
         int ie_refresh;
         int vary_ignore_expire;
-        int pipeline_prefetch;
         int surrogate_is_remote;
         int request_entities;
         int detect_broken_server_pconns;
@@ -361,6 +360,8 @@
         int client_dst_passthru;
     } onoff;
 
+    int pipeline_max_prefetch;
+
     int forward_max_tries;
     int connect_retries;
 

=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc     2013-05-14 18:36:45 +0000
+++ src/cache_cf.cc     2013-05-22 12:17:28 +0000
@@ -965,6 +965,16 @@
                (uint32_t)Config.maxRequestBufferSize, 
(uint32_t)Config.maxRequestHeaderSize);
     }
 
+    /*
+     * Disable client side request pipelining if client_persistent_connections 
OFF.
+     * Waste of resources queueing any pipelined requests when the first will 
close the connection.
+     */
+    if (Config.pipeline_max_prefetch > 0 && !Config.onoff.client_pconns) {
+        debugs(3, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: pipeline_prefetch " 
<< Config.pipeline_max_prefetch <<
+                   " requires client_persistent_connections ON. Forced 
pipeline_prefetch 0.");
+        Config.pipeline_max_prefetch = 0;
+    }
+
 #if USE_AUTH
     /*
      * disable client side request pipelining. There is a race with
@@ -973,12 +983,12 @@
      * pipelining OFF, the client may fail to authenticate, but squid's
      * state will be preserved.
      */
-    if (Config.onoff.pipeline_prefetch) {
+    if (Config.pipeline_max_prefetch > 0) {
         Auth::Config *nego = Auth::Config::Find("Negotiate");
         Auth::Config *ntlm = Auth::Config::Find("NTLM");
         if ((nego && nego->active()) || (ntlm && ntlm->active())) {
-            debugs(3, DBG_IMPORTANT, "WARNING: pipeline_prefetch breaks NTLM 
and Negotiate authentication. Forced OFF.");
-            Config.onoff.pipeline_prefetch = 0;
+            debugs(3, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: 
pipeline_prefetch breaks NTLM and Negotiate authentication. Forced 
pipeline_prefetch 0.");
+            Config.pipeline_max_prefetch = 0;
         }
     }
 #endif
@@ -2691,6 +2701,29 @@
 
 #define free_tristate free_int
 
+void
+parse_pipelinePrefetch(int *var)
+{
+    char *token = ConfigParser::strtokFile();
+
+    if (token == NULL)
+        self_destruct();
+
+    if (!strcmp(token, "on")) {
+        debugs(0, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: 'pipeline_prefetch 
on' is deprecated. Please update to use 1 (or a higher number).");
+        *var = 1;
+    } else if (!strcmp(token, "off")) {
+        debugs(0, DBG_PARSE_NOTE(2), "WARNING: 'pipeline_prefetch off' is 
deprecated. Please update to use '0'.");
+        *var = 0;
+    } else {
+        ConfigParser::strtokFileUndo();
+        parse_int(var);
+    }
+}
+
+#define free_pipelinePrefetch free_int
+#define dump_pipelinePrefetch dump_int
+
 static void
 dump_refreshpattern(StoreEntry * entry, const char *name, RefreshPattern * 
head)
 {

=== modified file 'src/cf.data.depend'
--- src/cf.data.depend  2013-05-12 05:04:14 +0000
+++ src/cf.data.depend  2013-05-16 04:00:13 +0000
@@ -51,6 +51,7 @@
 onoff
 peer
 peer_access            cache_peer acl
+pipelinePrefetch
 PortCfg
 QosConfig
 refreshpattern

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre     2013-05-14 17:53:18 +0000
+++ src/cf.data.pre     2013-05-21 02:50:03 +0000
@@ -8701,17 +8701,23 @@
 DOC_END
 
 NAME: pipeline_prefetch
-TYPE: onoff
-LOC: Config.onoff.pipeline_prefetch
-DEFAULT: off
+TYPE: pipelinePrefetch
+LOC: Config.pipeline_max_prefetch
+DEFAULT: 0
+DEFAULT_DOC: Do not pre-parse pipelined requests.
 DOC_START
-       To boost the performance of pipelined requests to closer
-       match that of a non-proxied environment Squid can try to fetch
-       up to two requests in parallel from a pipeline.
+       HTTP clients may send a pipeline of 1+N requests to Squid using a
+       single connection, without waiting for Squid to respond to the first
+       of those requests. This option limits the number of concurrent
+       requests Squid will try to handle in parallel. If set to N, Squid
+       will try to receive and process up to 1+N requests on the same
+       connection concurrently.
 
-       Defaults to off for bandwidth management and access logging
+       Defaults to 0 (off) for bandwidth management and access logging
        reasons.
 
+       NOTE: pipelining requires persistent connections to clients.
+
        WARNING: pipelining breaks NTLM and Negotiate/Kerberos authentication.
 DOC_END
 

=== modified file 'src/client_side.cc'
--- src/client_side.cc  2013-05-23 08:18:09 +0000
+++ src/client_side.cc  2013-05-24 14:48:34 +0000
@@ -919,6 +919,7 @@
 #if USE_SSL
     delete sslServerBump;
 #endif
+    xfree(pipelineBlockReason_);
 }
 
 /**
@@ -1991,6 +1992,12 @@
 static ClientSocketContext *
 parseHttpRequestAbort(ConnStateData * csd, const char *uri)
 {
+    // if the parser detected header frame problems and cannot even parse it
+    // we cannot pipeline anything following the possibly invalid frame end.
+    csd->pipelineDisable(uri);
+
+    // XXX: why is this not using quitAfterError()?
+
     ClientHttpRequest *http;
     ClientSocketContext *context;
     StoreIOBuffer tempBuffer;
@@ -2302,6 +2309,10 @@
     /* Set method_p */
     *method_p = HttpRequestMethod(&hp->buf[hp->req.m_start], 
&hp->buf[hp->req.m_end]+1);
 
+    /* CONNECT requests block pipelining if allowed. */
+    if (*method_p == Http::METHOD_CONNECT)
+        csd->pipelineBlock("CONNECT request");
+
     /* deny CONNECT via accelerated ports */
     if (*method_p == Http::METHOD_CONNECT && csd->port && 
csd->port->flags.accelSurrogate) {
         debugs(33, DBG_IMPORTANT, "WARNING: CONNECT method received on " << 
csd->port->protocol << " Accelerator port " << csd->port->s.GetPort() );
@@ -2567,6 +2578,7 @@
     if (request)
         request->flags.proxyKeepalive = false;
     flags.readMore = false;
+    pipelineDisable("will close after error");
     debugs(33,4, HERE << "Will close after error: " << clientConnection);
 }
 
@@ -2747,6 +2759,10 @@
         goto finish;
     }
 
+    // if client disabled persistence we should stop pipelining more requests.
+    if (!request->persistent())
+        conn->pipelineDisable("client will close");
+
     request->clientConnectionManager = conn;
 
     request->flags.accelerated = http->flags.accel;
@@ -2887,6 +2903,7 @@
         // consume header early so that body pipe gets just the body
         connNoteUseOfBuffer(conn, http->req_sz);
         notedUseOfBuffer = true;
+        conn->pipelineBlock("request payload");
 
         /* Is it too large? */
         if (!chunked && // if chunked, we will check as we accumulate
@@ -2933,6 +2950,7 @@
     if (request != NULL && request->flags.resetTcp && 
Comm::IsConnOpen(conn->clientConnection)) {
         debugs(33, 3, HERE << "Sending TCP RST on " << conn->clientConnection);
         conn->flags.readMore = false;
+        conn->pipelineDisable("Sending TCP RST");
         comm_reset_close(conn->clientConnection);
     }
 }
@@ -2946,17 +2964,79 @@
     }
 }
 
-static int
-connOkToAddRequest(ConnStateData * conn)
-{
-    int result = conn->getConcurrentRequestCount() < 
(Config.onoff.pipeline_prefetch ? 2 : 1);
-
-    if (!result) {
-        debugs(33, 3, HERE << conn->clientConnection << " max concurrent 
requests reached");
-        debugs(33, 5, HERE << conn->clientConnection << " defering new request 
until one is done");
-    }
-
-    return result;
+bool
+ConnStateData::pipelineEnabled() const
+{
+    return !pipelineTerminated_ && Config.pipeline_max_prefetch > 0;
+}
+
+void
+ConnStateData::pipelineDisable(const char *reason)
+{
+    debugs(33, 4, clientConnection << " pipeline terminated because " << 
reason);
+
+    if (pipelineTerminated_) {
+        debugs(33, 3, clientConnection << " pipeline already terminated by: " 
<< pipelineBlockReason_);
+        return;
+    }
+
+    pipelineTerminated_ = true;
+    xfree(pipelineBlockReason_); // may have been temporary-blocked. Terminate 
overrides that.
+    pipelineBlockReason_ = xstrdup(reason);
+}
+
+void
+ConnStateData::pipelineBlock(const char *reason)
+{
+    debugs(33, 4, clientConnection << " pipeline queue blocked because " << 
reason);
+
+    if (pipelineBlockReason_) {
+        debugs(33, 4, "pipeline queue already blocked due to: " << 
pipelineBlockReason_);
+        return;
+    }
+
+    pipelineBlockReason_ = xstrdup(reason);
+}
+
+bool
+ConnStateData::pipelineOkToAddRequest()
+{
+    // if client already sent Connection:close or some equivalent, we stop 
prefetching
+    if (pipelineTerminated_) {
+        debugs(33, 3, clientConnection << " defering new concurrent request 
due to " << pipelineBlockReason_);
+        return false;
+    }
+
+    const int existingRequestCount = getConcurrentRequestCount();
+
+    // if whatever request caused the temporary block has been completed, 
re-open the pipeline
+    if (existingRequestCount < 1) {
+        safe_free(pipelineBlockReason_)
+        return true;
+    }
+
+    // when still blocked
+    if (pipelineBlockReason_ != NULL) {
+        debugs(33, 3, clientConnection << " defering new concurrent request 
due to " << pipelineBlockReason_);
+        return true;
+    }
+
+    // default to the configured pipeline size.
+    // add 1 because the head of pipeline is counted in concurrent requests 
and not prefetch queue
+    const int concurrentRequestLimit = Config.pipeline_max_prefetch + 1;
+
+    // when queue filled already we cant add more.
+    if (existingRequestCount >= concurrentRequestLimit) {
+        debugs(33, 3, clientConnection << " max concurrent requests reached (" 
<< concurrentRequestLimit << ")");
+        debugs(33, 5, clientConnection << " defering new request until one is 
done");
+        return false;
+    }
+
+    // TODO: check that pipeline queue length is no more than M seconds long 
already.
+    // For long-delay queues we need to push back at the client via leaving 
things in TCP buffers.
+    // By time M all queued objects are already at risk of getting 
client-timeout errors.
+
+    return true;
 }
 
 /**
@@ -2981,8 +3061,8 @@
         if (in.notYetUsed == 0)
             break;
 
-        /* Limit the number of concurrent requests to 2 */
-        if (!connOkToAddRequest(this)) {
+        /* Limit the number of concurrent requests */
+        if (!pipelineOkToAddRequest()) {
             break;
         }
 

=== modified file 'src/client_side.h'
--- src/client_side.h   2013-04-04 06:15:00 +0000
+++ src/client_side.h   2013-05-24 11:24:55 +0000
@@ -178,7 +178,7 @@
 /**
  * Manages a connection to a client.
  *
- * Multiple requests (up to 2) can be pipelined. This object is responsible 
for managing
+ * Multiple requests (up to pipeline_prefetch) can be pipelined. This object 
is responsible for managing
  * which one is currently being fulfilled and what happens to the queue if the 
current one
  * causes the client connection to be closed early.
  *
@@ -209,6 +209,17 @@
     ClientSocketContext::Pointer getCurrentContext() const;
     void addContextToQueue(ClientSocketContext * context);
     int getConcurrentRequestCount() const;
+
+    /// whether pipelining on this connection is enabled
+    /// Note that this will return true even if the pipeline is temporarily 
blocked or queue is full.
+    bool pipelineEnabled() const;
+    /// whether it is okay to parse and queue another pipelined request right 
now
+    bool pipelineOkToAddRequest();
+    /// temporarily block pipelining until the currently queued requests are 
all completed.
+    void pipelineBlock(const char *reason);
+    /// terminate pipelining any further requests due to eth reason given.
+    void pipelineDisable(const char *reason);
+
     bool isOpen() const;
     void checkHeaderLimits();
 
@@ -419,6 +430,11 @@
     /// the reason why we no longer read the request or nil
     const char *stoppedReceiving_;
 
+    /// whether pipelining on this connection has reached a state where it 
must stop completely.
+    bool pipelineTerminated_;
+    /// the reason why we will not prefetch new pipeline requests, or nil
+    const char *pipelineBlockReason_;
+
     AsyncCall::Pointer reader; ///< set when we are reading
     BodyPipe::Pointer bodyPipe; // set when we are reading request body
 

Reply via email to