The patch.

Στις 26/07/2017 12:37 μμ, ο Christos Tsantilas έγραψε:
Squid can be killed or maimed by enough clients that start multi-step connection authentication but never follow up with the second HTTP request while keeping their HTTP connection open. Affected helpers remain in the "reserved" state and cannot be reused for other clients. Observed helper exhaustion has happened without any malicious intent.

To address the problem, we add a helper reservation timeout. Timed out reserved helpers may be reused by new clients/connections. To minimize problems with slow-to-resume-authentication clients, timed out reserved helpers are not reused until there are no unreserved running helpers left. The reservations are tracked using unique integer IDs.

Also fixed Squid crashes caused by unexpected helper termination -- the raw UserRequest::authserver pointer could point to a deleted helper.

This is a Measurement Factory project.
_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reuse reserved Negotiate and NTLM helpers after an idle timeout.

Squid can be killed or maimed by enough clients that start multi-step
connection authentication but never follow up with the second HTTP
request while keeping their HTTP connection open. Affected helpers
remain in the "reserved" state and cannot be reused for other clients.
Observed helper exhaustion has happened without any malicious intent.

To address the problem, we add a helper reservation timeout. Timed out
reserved helpers may be reused by new clients/connections. To minimize
problems with slow-to-resume-authentication clients, timed out reserved
helpers are not reused until there are no unreserved running helpers left.
The reservations are tracked using unique integer IDs.

Also fixed Squid crashes caused by unexpected helper termination -- the
raw UserRequest::authserver pointer could point to a deleted helper.

This is a Measurement Factory project.

=== modified file 'src/auth/negotiate/UserRequest.cc'
--- src/auth/negotiate/UserRequest.cc	2017-02-05 09:40:21 +0000
+++ src/auth/negotiate/UserRequest.cc	2017-07-25 08:57:37 +0000
@@ -11,41 +11,40 @@
 #include "auth/CredentialsCache.h"
 #include "auth/negotiate/Config.h"
 #include "auth/negotiate/User.h"
 #include "auth/negotiate/UserRequest.h"
 #include "auth/State.h"
 #include "auth/User.h"
 #include "client_side.h"
 #include "fatal.h"
 #include "format/Format.h"
 #include "globals.h"
 #include "helper.h"
 #include "helper/Reply.h"
 #include "http/Stream.h"
 #include "HttpHeaderTools.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "MemBuf.h"
 #include "SquidTime.h"
 
 Auth::Negotiate::UserRequest::UserRequest() :
-    authserver(nullptr),
     server_blob(nullptr),
     client_blob(nullptr),
     waiting(0),
     request(nullptr)
 {}
 
 Auth::Negotiate::UserRequest::~UserRequest()
 {
     assert(LockCount()==0);
     safe_free(server_blob);
     safe_free(client_blob);
 
     releaseAuthServer();
 
     if (request) {
         HTTPMSGUNLOCK(request);
         request = NULL;
     }
 }
 
@@ -147,54 +146,54 @@
         if (keyExtras)
             printResult = snprintf(buf, sizeof(buf), "KK %s %s\n", client_blob, keyExtras);
         else
             printResult = snprintf(buf, sizeof(buf), "KK %s\n", client_blob);
     }
 
     if (printResult < 0 || printResult >= (int)sizeof(buf)) {
         if (printResult < 0)
             debugs(29, DBG_CRITICAL, "ERROR: Can not build negotiate authentication helper request");
         else
             debugs(29, DBG_CRITICAL, "ERROR: Negotiate authentication helper request too big for the " << sizeof(buf) << "-byte buffer");
         handler(data);
         return;
     }
 
     waiting = 1;
 
     safe_free(client_blob);
 
     helperStatefulSubmit(negotiateauthenticators, buf, Auth::Negotiate::UserRequest::HandleReply,
-                         new Auth::StateData(this, handler, data), authserver);
+                         new Auth::StateData(this, handler, data), reservationId);
 }
 
 /**
  * Atomic action: properly release the Negotiate auth helpers which may have been reserved
  * for this request connections use.
  */
 void
 Auth::Negotiate::UserRequest::releaseAuthServer()
 {
-    if (authserver) {
-        debugs(29, 6, HERE << "releasing Negotiate auth server '" << authserver << "'");
-        helperStatefulReleaseServer(authserver);
-        authserver = NULL;
+    if (reservationId) {
+        debugs(29, 6, reservationId);
+        negotiateauthenticators->cancelReservation(reservationId);
+        reservationId.clear();
     } else
         debugs(29, 6, HERE << "No Negotiate auth server to release.");
 }
 
 void
 Auth::Negotiate::UserRequest::authenticate(HttpRequest * aRequest, ConnStateData * conn, Http::HdrType type)
 {
     /* Check that we are in the client side, where we can generate
      * auth challenges */
 
     if (conn == NULL || !cbdataReferenceValid(conn)) {
         user()->credentials(Auth::Failed);
         debugs(29, DBG_IMPORTANT, "WARNING: Negotiate Authentication attempt to perform authentication without a connection!");
         return;
     }
 
     if (waiting) {
         debugs(29, DBG_IMPORTANT, "WARNING: Negotiate Authentication waiting for helper reply!");
         return;
     }
@@ -249,71 +248,71 @@
         request = aRequest;
         HTTPMSGLOCK(request);
         break;
 
     case Auth::Ok:
         fatal("Auth::Negotiate::UserRequest::authenticate: unexpected auth state DONE! Report a bug to the squid developers.\n");
         break;
 
     case Auth::Failed:
         /* we've failed somewhere in authentication */
         debugs(29, 9, HERE << "auth state negotiate failed. " << proxy_auth);
         break;
     }
 }
 
 void
 Auth::Negotiate::UserRequest::HandleReply(void *data, const Helper::Reply &reply)
 {
     Auth::StateData *r = static_cast<Auth::StateData *>(data);
 
-    debugs(29, 8, HERE << "helper: '" << reply.whichServer << "' sent us reply=" << reply);
+    debugs(29, 8, reply.reservationId << " got reply=" << reply);
 
     if (!cbdataReferenceValid(r->data)) {
-        debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication invalid callback data. helper '" << reply.whichServer << "'.");
+        debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication invalid callback data (" << reply.reservationId << ")");
         delete r;
         return;
     }
 
     Auth::UserRequest::Pointer auth_user_request = r->auth_user_request;
     assert(auth_user_request != NULL);
 
     // add new helper kv-pair notes to the credentials object
     // so that any transaction using those credentials can access them
     auth_user_request->user()->notes.appendNewOnly(&reply.notes);
     // remove any private credentials detail which got added.
     auth_user_request->user()->notes.remove("token");
 
     Auth::Negotiate::UserRequest *lm_request = dynamic_cast<Auth::Negotiate::UserRequest *>(auth_user_request.getRaw());
     assert(lm_request != NULL);
     assert(lm_request->waiting);
 
     lm_request->waiting = 0;
     safe_free(lm_request->client_blob);
 
     assert(auth_user_request->user() != NULL);
     assert(auth_user_request->user()->auth_type == Auth::AUTH_NEGOTIATE);
 
-    if (lm_request->authserver == NULL)
-        lm_request->authserver = reply.whichServer.get(); // XXX: no locking?
+    if (!lm_request->reservationId)
+        lm_request->reservationId = reply.reservationId;
     else
-        assert(reply.whichServer == lm_request->authserver);
+        assert(reply.reservationId == lm_request->reservationId);
 
     switch (reply.result) {
     case Helper::TT:
         /* we have been given a blob to send to the client */
         safe_free(lm_request->server_blob);
         lm_request->request->flags.mustKeepalive = true;
         if (lm_request->request->flags.proxyKeepalive) {
             const char *tokenNote = reply.notes.findFirst("token");
             lm_request->server_blob = xstrdup(tokenNote);
             auth_user_request->user()->credentials(Auth::Handshake);
             auth_user_request->setDenyMessage("Authentication in progress");
             debugs(29, 4, HERE << "Need to challenge the client with a server token: '" << tokenNote << "'");
         } else {
             auth_user_request->user()->credentials(Auth::Failed);
             auth_user_request->setDenyMessage("Negotiate authentication requires a persistent connection");
         }
         break;
 
     case Helper::Okay: {
         const char *userNote = reply.notes.findFirst("user");
@@ -351,41 +350,41 @@
         /* set these to now because this is either a new login from an
          * existing user or a new user */
         local_auth_user->expiretime = current_time.tv_sec;
         auth_user_request->user()->credentials(Auth::Ok);
         debugs(29, 4, HERE << "Successfully validated user via Negotiate. Username '" << auth_user_request->user()->username() << "'");
     }
     break;
 
     case Helper::Error:
         /* authentication failure (wrong password, etc.) */
         auth_user_request->denyMessageFromHelper("Negotiate", reply);
         auth_user_request->user()->credentials(Auth::Failed);
         safe_free(lm_request->server_blob);
         if (const char *tokenNote = reply.notes.findFirst("token"))
             lm_request->server_blob = xstrdup(tokenNote);
         lm_request->releaseAuthServer();
         debugs(29, 4, "Failed validating user via Negotiate. Result: " << reply);
         break;
 
     case Helper::Unknown:
-        debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication Helper '" << reply.whichServer << "' crashed!.");
+        debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication Helper crashed (" << reply.reservationId << ")");
     /* continue to the next case */
 
     case Helper::TimedOut:
     case Helper::BrokenHelper:
         /* TODO kick off a refresh process. This can occur after a YR or after
          * a KK. If after a YR release the helper and resubmit the request via
          * Authenticate Negotiate start.
          * If after a KK deny the user's request w/ 407 and mark the helper as
          * Needing YR. */
         if (reply.result == Helper::Unknown)
             auth_user_request->setDenyMessage("Internal Error");
         else
             auth_user_request->denyMessageFromHelper("Negotiate", reply);
         auth_user_request->user()->credentials(Auth::Failed);
         safe_free(lm_request->server_blob);
         lm_request->releaseAuthServer();
         debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication validating user. Result: " << reply);
         break;
     }
 

=== modified file 'src/auth/negotiate/UserRequest.h'
--- src/auth/negotiate/UserRequest.h	2017-01-01 00:12:22 +0000
+++ src/auth/negotiate/UserRequest.h	2017-07-18 10:38:30 +0000
@@ -1,68 +1,70 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_SRC_AUTH_NEGOTIATE_USERREQUEST_H
 #define _SQUID_SRC_AUTH_NEGOTIATE_USERREQUEST_H
 
 #if HAVE_AUTH_MODULE_NEGOTIATE
 
 #include "auth/UserRequest.h"
 #include "helper/forward.h"
+#include "helper/ReservationId.h"
 
 class ConnStateData;
 class HttpReply;
 class HttpRequest;
 
 namespace Auth
 {
 namespace Negotiate
 {
 
 class UserRequest : public Auth::UserRequest
 {
     MEMPROXY_CLASS(Auth::Negotiate::UserRequest);
 
 public:
     UserRequest();
     virtual ~UserRequest();
     virtual int authenticated() const;
     virtual void authenticate(HttpRequest * request, ConnStateData * conn, Http::HdrType type);
     virtual Direction module_direction();
     virtual void startHelperLookup(HttpRequest *request, AccessLogEntry::Pointer &al, AUTHCB *, void *);
     virtual const char *credentialsStr();
 
     virtual const char * connLastHeader();
 
-    /* we need to store the helper server between requests */
-    helper_stateful_server *authserver;
     void releaseAuthServer(void); ///< Release the authserver helper server properly.
 
     /* what connection is this associated with */
     /* ConnStateData * conn;*/
 
     /* our current blob to pass to the client */
     char *server_blob;
     /* our current blob to pass to the server */
     char *client_blob;
 
     /* currently waiting for helper response */
     unsigned char waiting;
 
     /* need access to the request flags to mess around on pconn failure */
     HttpRequest *request;
 
+    /// a helper-issued reservation locking the helper state between
+    /// HTTP requests
+    Helper::ReservationId reservationId;
 private:
     static HLPCB HandleReply;
 };
 
 } // namespace Negotiate
 } // namespace Auth
 
 #endif /* HAVE_AUTH_MODULE_NEGOTIATE */
 #endif /* _SQUID_SRC_AUTH_NEGOTIATE_USERREQUEST_H */
 

=== modified file 'src/auth/ntlm/UserRequest.cc'
--- src/auth/ntlm/UserRequest.cc	2017-02-16 11:51:56 +0000
+++ src/auth/ntlm/UserRequest.cc	2017-07-25 08:57:09 +0000
@@ -9,41 +9,40 @@
 #include "squid.h"
 #include "AccessLogEntry.h"
 #include "auth/CredentialsCache.h"
 #include "auth/ntlm/Config.h"
 #include "auth/ntlm/User.h"
 #include "auth/ntlm/UserRequest.h"
 #include "auth/State.h"
 #include "cbdata.h"
 #include "client_side.h"
 #include "fatal.h"
 #include "format/Format.h"
 #include "globals.h"
 #include "helper.h"
 #include "helper/Reply.h"
 #include "http/Stream.h"
 #include "HttpRequest.h"
 #include "MemBuf.h"
 #include "SquidTime.h"
 
 Auth::Ntlm::UserRequest::UserRequest() :
-    authserver(nullptr),
     server_blob(nullptr),
     client_blob(nullptr),
     waiting(0),
     request(nullptr)
 {}
 
 Auth::Ntlm::UserRequest::~UserRequest()
 {
     assert(LockCount()==0);
     safe_free(server_blob);
     safe_free(client_blob);
 
     releaseAuthServer();
 
     if (request) {
         HTTPMSGUNLOCK(request);
         request = NULL;
     }
 }
 
@@ -140,54 +139,54 @@
             printResult = snprintf(buf, sizeof(buf), "YR %s\n", client_blob); //CHECKME: can ever client_blob be 0 here?
     } else {
         if (keyExtras)
             printResult = snprintf(buf, sizeof(buf), "KK %s %s\n", client_blob, keyExtras);
         else
             printResult = snprintf(buf, sizeof(buf), "KK %s\n", client_blob);
     }
     waiting = 1;
 
     if (printResult < 0 || printResult >= (int)sizeof(buf)) {
         if (printResult < 0)
             debugs(29, DBG_CRITICAL, "ERROR: Can not build ntlm authentication helper request");
         else
             debugs(29, DBG_CRITICAL, "ERROR: Ntlm authentication helper request too big for the " << sizeof(buf) << "-byte buffer.");
         handler(data);
         return;
     }
 
     safe_free(client_blob);
     helperStatefulSubmit(ntlmauthenticators, buf, Auth::Ntlm::UserRequest::HandleReply,
-                         new Auth::StateData(this, handler, data), authserver);
+                         new Auth::StateData(this, handler, data), reservationId);
 }
 
 /**
  * Atomic action: properly release the NTLM auth helpers which may have been reserved
  * for this request connections use.
  */
 void
 Auth::Ntlm::UserRequest::releaseAuthServer()
 {
-    if (authserver) {
-        debugs(29, 6, HERE << "releasing NTLM auth server '" << authserver << "'");
-        helperStatefulReleaseServer(authserver);
-        authserver = NULL;
+    if (reservationId) {
+        debugs(29, 6, reservationId);
+        ntlmauthenticators->cancelReservation(reservationId);
+        reservationId.clear();
     } else
         debugs(29, 6, HERE << "No NTLM auth server to release.");
 }
 
 void
 Auth::Ntlm::UserRequest::authenticate(HttpRequest * aRequest, ConnStateData * conn, Http::HdrType type)
 {
     /* Check that we are in the client side, where we can generate
      * auth challenges */
 
     if (conn == NULL || !cbdataReferenceValid(conn)) {
         user()->credentials(Auth::Failed);
         debugs(29, DBG_IMPORTANT, "WARNING: NTLM Authentication attempt to perform authentication without a connection!");
         return;
     }
 
     if (waiting) {
         debugs(29, DBG_IMPORTANT, "WARNING: NTLM Authentication waiting for helper reply!");
         return;
     }
@@ -243,71 +242,71 @@
         request = aRequest;
         HTTPMSGLOCK(request);
         break;
 
     case Auth::Ok:
         fatal("Auth::Ntlm::UserRequest::authenticate: unexpect auth state DONE! Report a bug to the squid developers.\n");
         break;
 
     case Auth::Failed:
         /* we've failed somewhere in authentication */
         debugs(29, 9, HERE << "auth state ntlm failed. " << proxy_auth);
         break;
     }
 }
 
 void
 Auth::Ntlm::UserRequest::HandleReply(void *data, const Helper::Reply &reply)
 {
     Auth::StateData *r = static_cast<Auth::StateData *>(data);
 
-    debugs(29, 8, HERE << "helper: '" << reply.whichServer << "' sent us reply=" << reply);
+    debugs(29, 8, reply.reservationId << " got reply=" << reply);
 
     if (!cbdataReferenceValid(r->data)) {
-        debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication invalid callback data. helper '" << reply.whichServer << "'.");
+        debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication invalid callback data(" << reply.reservationId <<")");
         delete r;
         return;
     }
 
     Auth::UserRequest::Pointer auth_user_request = r->auth_user_request;
     assert(auth_user_request != NULL);
 
     // add new helper kv-pair notes to the credentials object
     // so that any transaction using those credentials can access them
     auth_user_request->user()->notes.appendNewOnly(&reply.notes);
     // remove any private credentials detail which got added.
     auth_user_request->user()->notes.remove("token");
 
     Auth::Ntlm::UserRequest *lm_request = dynamic_cast<Auth::Ntlm::UserRequest *>(auth_user_request.getRaw());
     assert(lm_request != NULL);
     assert(lm_request->waiting);
 
     lm_request->waiting = 0;
     safe_free(lm_request->client_blob);
 
     assert(auth_user_request->user() != NULL);
     assert(auth_user_request->user()->auth_type == Auth::AUTH_NTLM);
 
-    if (lm_request->authserver == NULL)
-        lm_request->authserver = reply.whichServer.get(); // XXX: no locking?
+    if (!lm_request->reservationId)
+        lm_request->reservationId = reply.reservationId;
     else
-        assert(reply.whichServer == lm_request->authserver);
+        assert(lm_request->reservationId == reply.reservationId);
 
     switch (reply.result) {
     case Helper::TT:
         /* we have been given a blob to send to the client */
         safe_free(lm_request->server_blob);
         lm_request->request->flags.mustKeepalive = true;
         if (lm_request->request->flags.proxyKeepalive) {
             const char *serverBlob = reply.notes.findFirst("token");
             lm_request->server_blob = xstrdup(serverBlob);
             auth_user_request->user()->credentials(Auth::Handshake);
             auth_user_request->setDenyMessage("Authentication in progress");
             debugs(29, 4, HERE << "Need to challenge the client with a server token: '" << serverBlob << "'");
         } else {
             auth_user_request->user()->credentials(Auth::Failed);
             auth_user_request->setDenyMessage("NTLM authentication requires a persistent connection");
         }
         break;
 
     case Helper::Okay: {
         /* we're finished, release the helper */
@@ -343,41 +342,41 @@
             auth_user_request->user(local_auth_user);
         }
         /* set these to now because this is either a new login from an
          * existing user or a new user */
         local_auth_user->expiretime = current_time.tv_sec;
         auth_user_request->user()->credentials(Auth::Ok);
         debugs(29, 4, HERE << "Successfully validated user via NTLM. Username '" << auth_user_request->user()->username() << "'");
     }
     break;
 
     case Helper::Error:
         /* authentication failure (wrong password, etc.) */
         auth_user_request->denyMessageFromHelper("NTLM", reply);
         auth_user_request->user()->credentials(Auth::Failed);
         safe_free(lm_request->server_blob);
         lm_request->releaseAuthServer();
         debugs(29, 4, "Failed validating user via NTLM. Result: " << reply);
         break;
 
     case Helper::Unknown:
-        debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication Helper '" << reply.whichServer << "' crashed!.");
+        debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication Helper crashed (" << reply.reservationId << ")");
     /* continue to the next case */
 
     case Helper::TimedOut:
     case Helper::BrokenHelper:
         /* TODO kick off a refresh process. This can occur after a YR or after
          * a KK. If after a YR release the helper and resubmit the request via
          * Authenticate NTLM start.
          * If after a KK deny the user's request w/ 407 and mark the helper as
          * Needing YR. */
         if (reply.result == Helper::Unknown)
             auth_user_request->setDenyMessage("Internal Error");
         else
             auth_user_request->denyMessageFromHelper("NTLM", reply);
         auth_user_request->user()->credentials(Auth::Failed);
         safe_free(lm_request->server_blob);
         lm_request->releaseAuthServer();
         debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication validating user. Result: " << reply);
         break;
     }
 

=== modified file 'src/auth/ntlm/UserRequest.h'
--- src/auth/ntlm/UserRequest.h	2017-01-01 00:12:22 +0000
+++ src/auth/ntlm/UserRequest.h	2017-07-18 10:38:53 +0000
@@ -1,66 +1,68 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_SRC_AUTH_NTLM_USERREQUEST_H
 #define _SQUID_SRC_AUTH_NTLM_USERREQUEST_H
 
 #if HAVE_AUTH_MODULE_NTLM
 
 #include "auth/UserRequest.h"
 #include "helper/forward.h"
+#include "helper/ReservationId.h"
 
 class ConnStateData;
 class HttpReply;
 class HttpRequest;
 
 namespace Auth
 {
 namespace Ntlm
 {
 
 class UserRequest : public Auth::UserRequest
 {
     MEMPROXY_CLASS(Auth::Ntlm::UserRequest);
 
 public:
     UserRequest();
     virtual ~UserRequest();
     virtual int authenticated() const;
     virtual void authenticate(HttpRequest * request, ConnStateData * conn, Http::HdrType type);
     virtual Auth::Direction module_direction();
     virtual void startHelperLookup(HttpRequest *req, AccessLogEntry::Pointer &al, AUTHCB *, void *);
     virtual const char *credentialsStr();
 
     virtual const char * connLastHeader();
 
-    /* we need to store the helper server between requests */
-    helper_stateful_server *authserver;
     virtual void releaseAuthServer(); ///< Release authserver NTLM helpers properly when finished or abandoning.
 
     /* our current blob to pass to the client */
     char *server_blob;
 
     /* our current blob to pass to the server */
     char *client_blob;
 
     /* currently waiting for helper response */
     unsigned char waiting;
 
     /* need access to the request flags to mess around on pconn failure */
     HttpRequest *request;
 
+    /// a helper-issued reservation locking the helper state between
+    /// HTTP requests
+    Helper::ReservationId reservationId;
 private:
     static HLPCB HandleReply;
 };
 
 } // namespace Ntlm
 } // namespace Auth
 
 #endif /* HAVE_AUTH_MODULE_NTLM */
 #endif /* _SQUID_SRC_AUTH_NTLM_USERREQUEST_H */
 

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre	2017-06-12 20:26:41 +0000
+++ src/cf.data.pre	2017-07-18 10:42:13 +0000
@@ -551,40 +551,41 @@
 		Avoid adding frequently changing information to key_extras. For
 		example, if you add user source IP, and it changes frequently
 		in your environment, then max_user_ip ACL is going to treat
 		every user+IP combination as a unique "user", breaking the ACL
 		and wasting a lot of memory on those user records. It will also
 		force users to authenticate from scratch whenever their IP
 		changes.
 
 	"realm" string
 		Specifies the protection scope (aka realm name) which is to be
 		reported to the client for the authentication scheme. It is
 		commonly part of the text the user will see when prompted for
 		their username and password.
 
 		For Basic the default is "Squid proxy-caching web server".
 		For Digest there is no default, this parameter is mandatory.
 		For NTLM and Negotiate this parameter is ignored.
 
 	"children" numberofchildren [startup=N] [idle=N] [concurrency=N]
 		[queue-size=N] [on-persistent-overload=action]
+		[reservation-timeout=seconds]
 
 		The maximum number of authenticator processes to spawn. If
 		you start too few Squid will have to wait for them to process
 		a backlog of credential verifications, slowing it down. When
 		password verifications are done via a (slow) network you are
 		likely to need lots of authenticator processes.
 
 		The startup= and idle= options permit some skew in the exact
 		amount run. A minimum of startup=N will begin during startup
 		and reconfigure. Squid will start more in groups of up to
 		idle=N in an attempt to meet traffic needs and to keep idle=N
 		free above those traffic needs up to the maximum.
 
 		The concurrency= option sets the number of concurrent requests
 		the helper can process.  The default of 0 is used for helpers
 		who only supports one request at a time. Setting this to a
 		number greater than 0 changes the protocol used to include a
 		channel ID field first on the request/response line, allowing
 		multiple requests to be sent to the same helper in parallel
 		without waiting for the response.
@@ -600,40 +601,56 @@
 		on-persistent-overload option applies.
 
 		The on-persistent-overload=action option specifies Squid
 		reaction to a new helper request arriving when the helper
 		has been overloaded for more that 3 minutes already. The number
 		of queued requests determines whether the helper is overloaded
 		(see the queue-size option).
 
 		Two actions are supported:
 
 		  die	Squid worker quits. This is the default behavior.
 
 		  ERR	Squid treats the helper request as if it was
 			immediately submitted, and the helper immediately
 			replied with an ERR response. This action has no effect
 			on the already queued and in-progress helper requests.
 
 		NOTE: NTLM and Negotiate schemes do not support concurrency
 			in the Squid code module even though some helpers can.
 
+		The reservation-timeout=seconds option allows NTLM and Negotiate
+		helpers to forget about clients that abandon their in-progress
+		connection authentication without closing the connection. The
+		timeout is measured since the last helper response received by
+		Squid for the client. Fractional seconds are not supported.
+
+		After the timeout, the helper will be used for other clients if
+		there are no unreserved helpers available. In the latter case,
+		the old client attempt to resume authentication will not be
+		forwarded to the helper (and the client should open a new HTTP
+		connection and retry authentication from scratch).
+
+		By default, reservations do not expire and clients that keep
+		their connections open without completing authentication may
+		exhaust all NTLM and Negotiate helpers.
+
 	"keep_alive" on|off
 		If you experience problems with PUT/POST requests when using
 		the NTLM or Negotiate schemes then you can try setting this
 		to off. This will cause Squid to forcibly close the connection
 		on the initial request where the browser asks which schemes
 		are supported by the proxy.
 
 		For Basic and Digest this parameter is ignored.
 
 	"utf8" on|off
 		HTTP uses iso-latin-1 as character set, while some
 		authentication backends such as LDAP expects UTF-8. If this is
 		set to on Squid will translate the HTTP iso-latin-1 charset to
 		UTF-8 before sending the username and password to the helper.
 
 		For NTLM and Negotiate this parameter is ignored.
 
 IF HAVE_AUTH_MODULE_BASIC
 	=== Basic authentication parameters ===
 

=== modified file 'src/helper.cc'
--- src/helper.cc	2017-06-20 22:50:03 +0000
+++ src/helper.cc	2017-07-26 09:02:18 +0000
@@ -25,45 +25,43 @@
 #include "SquidConfig.h"
 #include "SquidIpc.h"
 #include "SquidMath.h"
 #include "SquidTime.h"
 #include "Store.h"
 #include "wordlist.h"
 
 // helper_stateful_server::data uses explicit alloc()/freeOne() */
 #include "mem/Pool.h"
 
 #define HELPER_MAX_ARGS 64
 
 /// The maximum allowed request retries.
 #define MAX_RETRIES 2
 
 /// Helpers input buffer size.
 const size_t ReadBufSize(32*1024);
 
 static IOCB helperHandleRead;
 static IOCB helperStatefulHandleRead;
-static void helperServerFree(helper_server *srv);
-static void helperStatefulServerFree(helper_stateful_server *srv);
 static void Enqueue(helper * hlp, Helper::Xaction *);
 static helper_server *GetFirstAvailable(const helper * hlp);
-static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
+static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
 static void helperKickQueue(helper * hlp);
 static void helperStatefulKickQueue(statefulhelper * hlp);
 static void helperStatefulServerDone(helper_stateful_server * srv);
 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
 
 CBDATA_CLASS_INIT(helper);
 CBDATA_CLASS_INIT(helper_server);
 CBDATA_CLASS_INIT(statefulhelper);
 CBDATA_CLASS_INIT(helper_stateful_server);
 
 InstanceIdDefinitions(HelperServerBase, "Hlpr");
 
 void
 HelperServerBase::initStats()
 {
     stats.uses=0;
     stats.replies=0;
     stats.pending=0;
@@ -104,40 +102,113 @@
     shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
 #endif
 
     flags.closing = true;
     if (readPipe->fd == writePipe->fd)
         readPipe->fd = -1;
     writePipe->close();
 
 #if _SQUID_WINDOWS_
     if (hIpc) {
         if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
             getCurrentTime();
             debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
                    " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
         }
         CloseHandle(hIpc);
     }
 #endif
 }
 
+void 
+HelperServerBase::dropQueued()
+{
+    while (!requests.empty()) {
+        // XXX: re-schedule these on another helper?
+        Helper::Xaction *r = requests.front();
+        requests.pop_front();
+        void *cbdata;
+        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+            r->reply.result = Helper::Unknown;
+            r->request.callback(cbdata, r->reply);
+        }
+
+        delete r;
+    }
+}
+
+HelperServerBase::~HelperServerBase()
+{
+    if (rbuf) {
+        memFreeBuf(rbuf_sz, rbuf);
+        rbuf = NULL;
+    }
+}
+
+helper_server::~helper_server()
+{
+    wqueue->clean();
+    delete wqueue;
+
+    if (writebuf) {
+        writebuf->clean();
+        delete writebuf;
+        writebuf = NULL;
+    }
+
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+    cbdataReferenceDone(parent);
+}
+
+void
+helper_server::dropQueued()
+{
+    HelperServerBase::dropQueued();
+    requestsIndex.clear();
+}
+
+helper_stateful_server::~helper_stateful_server()
+{
+    /* TODO: walk the local queue of requests and carry them all out */
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    parent->cancelReservation(reservationId);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+
+    cbdataReferenceDone(parent);
+}
+
 void
 helperOpenServers(helper * hlp)
 {
     char *s;
     char *progname;
     char *shortname;
     char *procname;
     const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
     char fd_note_buf[FD_DESC_SZ];
     helper_server *srv;
     int nargs = 0;
     int k;
     pid_t pid;
     int rfd;
     int wfd;
     void * hIpc;
     wordlist *w;
 
     if (hlp->cmdline == NULL)
         return;
@@ -210,41 +281,41 @@
         srv->replyXaction = NULL;
         srv->ignoreToEom = false;
         srv->parent = cbdataReference(hlp);
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
         if (rfd == wfd) {
             snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
             fd_note(rfd, fd_note_buf);
         } else {
             snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
             fd_note(rfd, fd_note_buf);
             snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
             fd_note(wfd, fd_note_buf);
         }
 
         commSetNonBlocking(rfd);
 
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
         if (hlp->timeout && hlp->childs.concurrency) {
             AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
                                              CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
             commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
         }
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
                                              CommIoCbPtrFun(helperHandleRead, srv));
         comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
     }
 
     hlp->last_restart = squid_curtime;
     safe_free(shortname);
     safe_free(procname);
     helperKickQueue(hlp);
 }
 
 /**
@@ -307,69 +378,69 @@
         void * hIpc;
         pid_t pid = ipcCreate(hlp->ipc_type,
                               progname,
                               args,
                               shortname,
                               hlp->addr,
                               &rfd,
                               &wfd,
                               &hIpc);
 
         if (pid < 0) {
             debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
             continue;
         }
 
         ++ hlp->childs.n_running;
         ++ hlp->childs.n_active;
         helper_stateful_server *srv = new helper_stateful_server;
         srv->hIpc = hIpc;
         srv->pid = pid;
-        srv->flags.reserved = false;
         srv->initStats();
         srv->addr = hlp->addr;
         srv->readPipe = new Comm::Connection;
         srv->readPipe->fd = rfd;
         srv->writePipe = new Comm::Connection;
         srv->writePipe->fd = wfd;
         srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
         srv->roffset = 0;
         srv->parent = cbdataReference(hlp);
+        srv->reservationStart = 0;
 
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
         if (rfd == wfd) {
             snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
             fd_note(rfd, fd_note_buf);
         } else {
             snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
             fd_note(rfd, fd_note_buf);
             snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
             fd_note(wfd, fd_note_buf);
         }
 
         commSetNonBlocking(rfd);
 
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
                                              CommIoCbPtrFun(helperStatefulHandleRead, srv));
         comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
     }
 
     hlp->last_restart = squid_curtime;
     safe_free(shortname);
     safe_free(procname);
     helperStatefulKickQueue(hlp);
 }
 
 void
 helper::submitRequest(Helper::Xaction *r)
 {
     helper_server *srv;
 
     if ((srv = GetFirstAvailable(this)))
         helperDispatch(srv, r);
@@ -468,104 +539,152 @@
 
 bool
 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
 {
     if (!prepSubmit())
         return false; // request was dropped
 
     submit(buf, callback, data); // will send or queue
     return true; // request submitted or queued
 }
 
 /// dispatches or enqueues a helper requests; does not enforce queue limits
 void
 helper::submit(const char *buf, HLPCB * callback, void *data)
 {
     Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
     submitRequest(r);
     debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
 }
 
-/// lastserver = "server last used as part of a reserved request sequence"
+/// Submit request or callback the caller with a Helper::Error error.
+/// If the reservation is not set then reserves a new helper.
 void
-helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
 {
-    if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
+    if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
         SubmissionFailure(hlp, callback, data);
 }
 
 /// If possible, submit request. Otherwise, either kill Squid or return false.
 bool
-statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
+statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
 {
     if (!prepSubmit())
         return false; // request was dropped
 
-    submit(buf, callback, data, lastserver); // will send or queue
+    submit(buf, callback, data, reservation); // will send or queue
     return true; // request submitted or queued
 }
 
-void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+void
+statefulhelper::reserveServer(helper_stateful_server * srv)
 {
-    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+    // clear any old reservation
+    if (srv->reserved()) {
+        reservations.erase(srv->reservationId);
+        srv->clearReservation();
+    }
+
+    srv->reserve();
+    reservations.insert(Reservations::value_type(srv->reservationId, srv));
+}
+
+void
+statefulhelper::cancelReservation(const Helper::ReservationId reservation)
+{
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
+        return;
+
+    helper_stateful_server *srv = it->second;
+    reservations.erase(it);
+    srv->clearReservation();
+
+    // schedule a queue kick
+    AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
+    ScheduleCallHere(call);
+}
+
+helper_stateful_server *
+statefulhelper::findServer(const Helper::ReservationId & reservation)
+{
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
+        return nullptr;
+    return it->second;
+}
+
+void
+helper_stateful_server::reserve()
+{
+    assert(!reservationId);
+    reservationStart = squid_curtime;
+    reservationId = Helper::ReservationId::Next();
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+}
 
-    if ((buf != NULL) && lastserver) {
-        debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
-        assert(lastserver->flags.reserved);
-        assert(!lastserver->requests.size());
+void
+helper_stateful_server::clearReservation()
+{
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+    if (!reservationId)
+        return;
+
+    ++stats.releases;
+
+    reservationId.clear();
+    reservationStart = 0;
+}
+
+void
+statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
 
+    if (buf && reservation) {
+        debugs(84, 5, reservation);
+        helper_stateful_server *lastServer = findServer(reservation);
+        if (!lastServer) {
+            debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
+            r->reply.result = Helper::TimedOut;
+            r->request.callback(r->request.data, r->reply);
+            delete r;
+            return;
+        }
         debugs(84, 5, "StatefulSubmit dispatching");
-        helperStatefulDispatch(lastserver, r);
+        helperStatefulDispatch(lastServer, r);
     } else {
         helper_stateful_server *srv;
         if ((srv = StatefulGetFirstAvailable(this))) {
+            reserveServer(srv);
             helperStatefulDispatch(srv, r);
         } else
             StatefulEnqueue(this, r);
     }
 
     debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
            "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
 
     syncQueueStats();
 }
 
-/**
- * DPW 2007-05-08
- *
- * helperStatefulReleaseServer tells the helper that whoever was
- * using it no longer needs its services.
- */
-void
-helperStatefulReleaseServer(helper_stateful_server * srv)
-{
-    debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
-    if (!srv->flags.reserved)
-        return;
-
-    ++ srv->stats.releases;
-
-    srv->flags.reserved = false;
-
-    helperStatefulServerDone(srv);
-}
-
 void
 helper::packStatsInto(Packable *p, const char *label) const
 {
     if (label)
         p->appendf("%s:\n", label);
 
     p->appendf("  program: %s\n", cmdline->key);
     p->appendf("  number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
     p->appendf("  requests sent: %d\n", stats.requests);
     p->appendf("  replies received: %d\n", stats.replies);
     p->appendf("  requests timedout: %d\n", stats.timedout);
     p->appendf("  queue length: %d\n", stats.queue_size);
     p->appendf("  avg service time: %d msec\n", stats.avg_svc_time);
     p->append("\n",1);
     p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
                "ID #",
                "FD",
                "PID",
                "# Requests",
                "# Replies",
@@ -573,41 +692,41 @@
                "Flags",
                "Time",
                "Offset",
                "Request");
 
     for (dlink_node *link = servers.head; link; link = link->next) {
         HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
         assert(srv);
         Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
         double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
         p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
                    srv->index.value,
                    srv->readPipe->fd,
                    srv->pid,
                    srv->stats.uses,
                    srv->stats.replies,
                    srv->stats.timedout,
                    srv->stats.pending ? 'B' : ' ',
                    srv->flags.writing ? 'W' : ' ',
                    srv->flags.closing ? 'C' : ' ',
-                   srv->flags.reserved ? 'R' : ' ',
+                   srv->reserved() ? 'R' : ' ',
                    srv->flags.shutdown ? 'S' : ' ',
                    xaction && xaction->request.placeholder ? 'P' : ' ',
                    tt < 0.0 ? 0.0 : tt,
                    (int) srv->roffset,
                    xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
     }
 
     p->append("\nFlags key:\n"
               "   B\tBUSY\n"
               "   W\tWRITING\n"
               "   C\tCLOSING\n"
               "   R\tRESERVED\n"
               "   S\tSHUTDOWN PENDING\n"
               "   P\tPLACEHOLDER\n", 101);
 }
 
 bool
 helper::willOverload() const {
     return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
 }
@@ -661,202 +780,123 @@
 
         if (srv->flags.shutdown) {
             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
             continue;
         }
 
         assert(hlp->childs.n_active > 0);
         -- hlp->childs.n_active;
         srv->flags.shutdown = true; /* request it to shut itself down */
 
         if (srv->stats.pending) {
             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
             continue;
         }
 
         if (srv->flags.closing) {
             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
             continue;
         }
 
-        if (srv->flags.reserved) {
+        if (srv->reserved()) {
             if (shutting_down) {
                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
             } else {
                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
                 continue;
             }
         }
 
         debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
 
         /* the rest of the details is dealt with in the helperStatefulServerFree
          * close handler
          */
         srv->closePipesSafely(hlp->id_name);
     }
 }
 
 helper::~helper()
 {
     /* note, don't free id_name, it probably points to static memory */
 
     // TODO: if the queue is not empty it will leak Helper::Request's
     if (!queue.empty())
         debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
 }
 
-/* ====================================================================== */
-/* LOCAL FUNCTIONS */
-/* ====================================================================== */
-
-static void
-helperServerFree(helper_server *srv)
+void
+helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
 {
-    helper *hlp = srv->parent;
-    int concurrency = hlp->childs.concurrency;
-
-    if (!concurrency)
-        concurrency = 1;
-
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
-    }
-
-    srv->wqueue->clean();
-    delete srv->wqueue;
-
-    if (srv->writebuf) {
-        srv->writebuf->clean();
-        delete srv->writebuf;
-        srv->writebuf = NULL;
-    }
-
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
-
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
+    needsNewServers = false;
     if (!srv->flags.shutdown) {
-        assert(hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
+        assert(childs.n_active > 0);
+        --childs.n_active;
+        debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
 
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+        if (childs.needNew() > 0) {
+            debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
 
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
+            if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
                 if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
+                    fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
                 else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
+                    debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
             }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperOpenServers(hlp);
-        }
-    }
-
-    while (!srv->requests.empty()) {
-        // XXX: re-schedule these on another helper?
-        Helper::Xaction *r = srv->requests.front();
-        srv->requests.pop_front();
-        void *cbdata;
-
-        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
-            r->reply.result = Helper::Unknown;
-            r->request.callback(cbdata, r->reply);
+            srv->flags.shutdown = true;
+            needsNewServers = true;
         }
-
-        delete r;
     }
-    srv->requestsIndex.clear();
-
-    cbdataReferenceDone(srv->parent);
-    delete srv;
 }
 
-static void
-helperStatefulServerFree(helper_stateful_server *srv)
+void
+helper_server::HelperServerClosed(helper_server *srv)
 {
-    statefulhelper *hlp = srv->parent;
+    helper *hlp = srv->getParent();
 
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperOpenServers(hlp);
     }
 
-#if 0
-    srv->wqueue->clean();
+    srv->dropQueued();
 
-    delete srv->wqueue;
-
-#endif
-
-    /* TODO: walk the local queue of requests and carry them all out */
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
-
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
-    if (!srv->flags.shutdown) {
-        assert( hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
-
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
-
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
-                if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
-                else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
-            }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperStatefulOpenServers(hlp);
-        }
-    }
-
-    while (!srv->requests.empty()) {
-        // XXX: re-schedule these on another helper?
-        Helper::Xaction *r = srv->requests.front();
-        srv->requests.pop_front();
-        void *cbdata;
+    delete srv;
+}
 
-        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
-            r->reply.result = Helper::Unknown;
-            r->request.callback(cbdata, r->reply);
-        }
+// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
+// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
+void
+helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
+{
+    statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
 
-        delete r;
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperStatefulOpenServers(hlp);
     }
 
-    cbdataReferenceDone(srv->parent);
+    srv->dropQueued();
 
     delete srv;
 }
 
 Helper::Xaction *
 helper_server::popRequest(int request_number)
 {
     Helper::Xaction *r = nullptr;
     helper_server::RequestIndex::iterator it;
     if (parent->childs.concurrency) {
         // If concurency supported retrieve request from ID
         it = requestsIndex.find(request_number);
         if (it != requestsIndex.end()) {
             r = *(it->second);
             requests.erase(it->second);
             requestsIndex.erase(it);
         }
     } else if(!requests.empty()) {
         // Else get the first request from queue, if any
         r = requests.front();
@@ -1103,63 +1143,63 @@
                "helper that overflowed " << srv->rbuf_sz << "-byte " <<
                "Squid input buffer: " << hlp->id_name << " #" << srv->index);
         srv->closePipesSafely(hlp->id_name);
         return;
     }
     /**
      * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
      *      Doing this prohibits concurrency support with multiple replies per read().
      * TODO: check that read() setup on these buffers pays attention to roffest!=0
      * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
      */
     srv->roffset = 0;
 
     if (t) {
         /* end of reply found */
         srv->requests.pop_front(); // we already have it in 'r'
         int called = 1;
 
         if (r && cbdataReferenceValid(r->request.data)) {
             r->reply.finalize();
-            r->reply.whichServer = srv;
+            r->reply.reservationId = srv->reservationId;
             r->request.callback(r->request.data, r->reply);
         } else {
             debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
             called = 0;
         }
 
         delete r;
 
         -- srv->stats.pending;
         ++ srv->stats.replies;
 
         ++ hlp->stats.replies;
         srv->answer_time = current_time;
         hlp->stats.avg_svc_time =
             Math::intAverage(hlp->stats.avg_svc_time,
                              tvSubMsec(srv->dispatch_time, current_time),
                              hlp->stats.replies, REDIRECT_AV_FACTOR);
 
         if (called)
             helperStatefulServerDone(srv);
         else
-            helperStatefulReleaseServer(srv);
+            hlp->cancelReservation(srv->reservationId);
     }
 
     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
         int spaceSize = srv->rbuf_sz - 1;
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
                                              CommIoCbPtrFun(helperStatefulHandleRead, srv));
         comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
     }
 }
 
 /// Handles a request when all running helpers, if any, are busy.
 static void
 Enqueue(helper * hlp, Helper::Xaction * r)
 {
     hlp->queue.push(r);
     ++ hlp->stats.queue_size;
 
     /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
     if (hlp->childs.needNew() > 0) {
@@ -1255,67 +1295,81 @@
         }
 
         selected = srv;
     }
 
     if (!selected) {
         debugs(84, 5, "GetFirstAvailable: None available.");
         return NULL;
     }
 
     if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
         debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
         return NULL;
     }
 
     debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
     return selected;
 }
 
 static helper_stateful_server *
-StatefulGetFirstAvailable(const statefulhelper * hlp)
+StatefulGetFirstAvailable(statefulhelper * hlp)
 {
     dlink_node *n;
     helper_stateful_server *srv = NULL;
+    helper_stateful_server *oldestReservedServer = nullptr;
     debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
 
     if (hlp->childs.n_running == 0)
         return NULL;
 
     for (n = hlp->servers.head; n != NULL; n = n->next) {
         srv = (helper_stateful_server *)n->data;
 
         if (srv->stats.pending)
             continue;
 
-        if (srv->flags.reserved)
+        if (srv->reserved()) {
+            if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
+                if (!oldestReservedServer)
+                    oldestReservedServer = srv;
+                else if (oldestReservedServer->reservationStart < srv->reservationStart)
+                    oldestReservedServer = srv;
+                debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
+            }
             continue;
+        }
 
         if (srv->flags.shutdown)
             continue;
 
         debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
         return srv;
     }
 
+    if (oldestReservedServer) {
+        debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
+        return oldestReservedServer;
+    }
+
     debugs(84, 5, "StatefulGetFirstAvailable: None available.");
-    return NULL;
+    return nullptr;
 }
 
 static void
 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
 {
     helper_server *srv = (helper_server *)data;
 
     srv->writebuf->clean();
     delete srv->writebuf;
     srv->writebuf = NULL;
     srv->flags.writing = false;
 
     if (flag != Comm::OK) {
         /* Helper server has crashed */
         debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
         return;
     }
 
     if (!srv->wqueue->isNull()) {
         srv->writebuf = srv->wqueue;
@@ -1365,105 +1419,108 @@
 
     debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
 
     ++ srv->stats.uses;
     ++ srv->stats.pending;
     ++ hlp->stats.requests;
 }
 
 static void
 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
 {}
 
 static void
 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
 {
     statefulhelper *hlp = srv->parent;
 
     if (!cbdataReferenceValid(r->request.data)) {
         debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
         delete r;
-        helperStatefulReleaseServer(srv);
+        hlp->cancelReservation(srv->reservationId);
         return;
     }
 
     debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
 
+    assert(srv->reservationId);
+    r->reply.reservationId = srv->reservationId;
+
     if (r->request.placeholder == 1) {
         /* a callback is needed before this request can _use_ a helper. */
         /* we don't care about releasing this helper. The request NEVER
          * gets to the helper. So we throw away the return code */
         r->reply.result = Helper::Unknown;
-        r->reply.whichServer = srv;
         r->request.callback(r->request.data, r->reply);
         /* throw away the placeholder */
         delete r;
         /* and push the queue. Note that the callback may have submitted a new
          * request to the helper which is why we test for the request */
 
         if (!srv->requests.size())
             helperStatefulServerDone(srv);
 
         return;
     }
 
-    srv->flags.reserved = true;
     srv->requests.push_back(r);
     srv->dispatch_time = current_time;
     AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
                                          CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
     Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
     debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
            hlp->id_name << " #" << srv->index << ", " <<
            (int) strlen(r->request.buf) << " bytes");
 
     ++ srv->stats.uses;
     ++ srv->stats.pending;
     ++ hlp->stats.requests;
 }
 
 static void
 helperKickQueue(helper * hlp)
 {
     Helper::Xaction *r;
     helper_server *srv;
 
     while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
         helperDispatch(srv, r);
 }
 
 static void
 helperStatefulKickQueue(statefulhelper * hlp)
 {
     Helper::Xaction *r;
     helper_stateful_server *srv;
-
-    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
+    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
+        debugs(84, 5, "found srv-" << srv->index);
+        hlp->reserveServer(srv);
         helperStatefulDispatch(srv, r);
+    }
 }
 
 static void
 helperStatefulServerDone(helper_stateful_server * srv)
 {
     if (!srv->flags.shutdown) {
         helperStatefulKickQueue(srv->parent);
-    } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
+    } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
         srv->closeWritePipeSafely(srv->parent->id_name);
         return;
     }
 }
 
 void
 helper_server::checkForTimedOutRequests(bool const retry)
 {
     assert(parent->childs.concurrency);
     while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
         Helper::Xaction *r = requests.front();
         RequestIndex::iterator it;
         it = requestsIndex.find(r->request.Id);
         assert(it != requestsIndex.end());
         requestsIndex.erase(it);
         requests.pop_front();
         debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
         void *cbdata;
         bool retried = false;
         if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
@@ -1480,35 +1537,32 @@
                 r->request.callback(cbdata, r->reply);
             } else {
                 r->reply.result = Helper::TimedOut;
                 r->request.callback(cbdata, r->reply);
             }
         }
         --stats.pending;
         ++stats.timedout;
         ++parent->stats.timedout;
         if (!retried)
             delete r;
     }
 }
 
 void
 helper_server::requestTimeout(const CommTimeoutCbParams &io)
 {
     debugs(26, 3, HERE << io.conn);
     helper_server *srv = static_cast<helper_server *>(io.data);
 
-    if (!cbdataReferenceValid(srv))
-        return;
-
     srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
 
     debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
     AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
                                      CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
 
     const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
     const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
 
     commSetConnTimeout(io.conn, timeLeft, timeoutCall);
 }
 

=== modified file 'src/helper.h'
--- src/helper.h	2017-06-20 22:50:03 +0000
+++ src/helper.h	2017-07-26 08:31:19 +0000
@@ -3,62 +3,64 @@
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 84    Helper process maintenance */
 
 #ifndef SQUID_HELPER_H
 #define SQUID_HELPER_H
 
 #include "base/AsyncCall.h"
 #include "base/InstanceId.h"
 #include "cbdata.h"
 #include "comm/forward.h"
 #include "dlink.h"
 #include "helper/ChildConfig.h"
 #include "helper/forward.h"
 #include "helper/Reply.h"
 #include "helper/Request.h"
+#include "helper/ReservationId.h"
 #include "ip/Address.h"
 #include "sbuf/SBuf.h"
 
 #include <list>
-#include <map>
+#include <unordered_map>
 #include <queue>
 
 class Packable;
 class wordlist;
 
 namespace Helper
 {
 /// Holds the  required data to serve a helper request.
 class Xaction {
     MEMPROXY_CLASS(Helper::Xaction);
 public:
     Xaction(HLPCB *c, void *d, const char *b): request(c, d, b) {}
     Helper::Request request;
     Helper::Reply reply;
 };
 }
 
+class HelperServerBase;
 /**
  * Managers a set of individual helper processes with a common queue of requests.
  *
  * With respect to load, a helper goes through these states (roughly):
  *   idle:   no processes are working on requests (and no requests are queued);
  *   normal: some, but not all processes are working (and no requests are queued);
  *   busy:   all processes are working (and some requests are possibly queued);
  *   overloaded: a busy helper with more than queue-size requests in the queue.
  *
  * A busy helper queues new requests and issues a WARNING every 10 minutes or so.
  * An overloaded helper either drops new requests or keeps queuing them, depending on
  *   whether the caller can handle dropped requests (trySubmit vs helperSubmit APIs).
  * If an overloaded helper has been overloaded for 3+ minutes, an attempt to use
  *   it results in on-persistent-overload action, which may kill worker.
  */
 class helper
 {
     CBDATA_CLASS(helper);
 
 public:
@@ -77,190 +79,249 @@
         memset(&stats, 0, sizeof(stats));
     }
     ~helper();
 
     /// \returns next request in the queue, or nil.
     Helper::Xaction *nextRequest();
 
     /// If possible, submit request. Otherwise, either kill Squid or return false.
     bool trySubmit(const char *buf, HLPCB * callback, void *data);
 
     /// Submits a request to the helper or add it to the queue if none of
     /// the servers is available.
     void submitRequest(Helper::Xaction *r);
 
     /// Dump some stats about the helper state to a Packable object
     void packStatsInto(Packable *p, const char *label = NULL) const;
     /// whether the helper will be in "overloaded" state after one more request
     /// already overloaded helpers return true
     bool willOverload() const;
 
+    /// Updates interall statistics and start new helper server processes after
+    /// an unexpected server exit
+    /// \param needsNewServers true if new servers must started, false otherwise
+    void handleKilledServer(HelperServerBase *srv, bool &needsNewServers);
+
 public:
     wordlist *cmdline;
     dlink_list servers;
     std::queue<Helper::Xaction *> queue;
     const char *id_name;
     Helper::ChildConfig childs;    ///< Configuration settings for number running.
     int ipc_type;
     Ip::Address addr;
     unsigned int droppedRequests; ///< requests not sent during helper overload
     time_t overloadStart; ///< when the helper became overloaded (zero if it is not)
     time_t last_queue_warn;
     time_t last_restart;
     time_t timeout; ///< Requests timeout
     bool retryTimedOut; ///< Whether the timed-out requests must retried
     bool retryBrokenHelper; ///< Whether the requests must retried on BH replies
     SBuf onTimedOutResponse; ///< The response to use when helper response timedout
     char eom;   ///< The char which marks the end of (response) message, normally '\n'
 
     struct _stats {
         int requests;
         int replies;
         int timedout;
         int queue_size;
         int avg_svc_time;
     } stats;
 
 protected:
     friend void helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data);
     bool queueFull() const;
     bool overloaded() const;
     void syncQueueStats();
     bool prepSubmit();
     void submit(const char *buf, HLPCB * callback, void *data);
 };
 
 class statefulhelper : public helper
 {
     CBDATA_CLASS(statefulhelper);
 
 public:
+    typedef std::unordered_map<Helper::ReservationId, helper_stateful_server *> Reservations;
+
     inline statefulhelper(const char *name) : helper(name) {}
     inline ~statefulhelper() {}
 
+public:
+    /// reserve the given server
+    void reserveServer(helper_stateful_server * srv);
+
+    /// undo reserveServer(), clear the reservation and kick the queue
+    void cancelReservation(const Helper::ReservationId reservation);
+
 private:
-    friend void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver);
-    void submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver);
-    bool trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver);
+    friend void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation);
+
+    /// \return the previously reserved server (if the reservation is still valid) or nil
+    helper_stateful_server *findServer(const Helper::ReservationId & reservation);
+
+    void submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation);
+    bool trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation);
+
+    ///< reserved servers indexed by reservation IDs
+    Reservations reservations;
 };
 
-/**
- * Fields shared between stateless and stateful helper servers.
- */
-class HelperServerBase
+/// represents a single helper process abstraction
+class HelperServerBase: public CbdataParent
 {
 public:
+    virtual ~HelperServerBase();
     /** Closes pipes to the helper safely.
      * Handles the case where the read and write pipes are the same FD.
      *
      * \param name displayed for the helper being shutdown if logging an error
      */
     void closePipesSafely(const char *name);
 
     /** Closes the reading pipe.
      * If the read and write sockets are the same the write pipe will
      * also be closed. Otherwise its left open for later handling.
      *
      * \param name displayed for the helper being shutdown if logging an error
      */
     void closeWritePipeSafely(const char *name);
 
+    // TODO: Teach each child to report its child-specific state instead.
+    /// whether the server is locked for exclusive use by a client
+    virtual bool reserved() = 0;
+
+    /// dequeues and sends a Helper::Unknown answer to all queued requests
+    virtual void dropQueued();
+
+    /// the helper object that created this server
+    virtual helper *getParent() const = 0;
+
 public:
     /// Helper program identifier; does not change when contents do,
     ///   including during assignment
     const InstanceId<HelperServerBase> index;
     int pid;
     Ip::Address addr;
     Comm::ConnectionPointer readPipe;
     Comm::ConnectionPointer writePipe;
     void *hIpc;
 
     char *rbuf;
     size_t rbuf_sz;
     size_t roffset;
 
     struct timeval dispatch_time;
     struct timeval answer_time;
 
     dlink_node link;
 
     struct _helper_flags {
         bool writing;
         bool closing;
         bool shutdown;
-        bool reserved;
     } flags;
 
     typedef std::list<Helper::Xaction *> Requests;
     Requests requests; ///< requests in order of submission/expiration
 
     struct {
         uint64_t uses;     //< requests sent to this helper
         uint64_t replies;  //< replies received from this helper
         uint64_t pending;  //< queued lookups waiting to be sent to this helper
         uint64_t releases; //< times release() has been called on this helper (if stateful)
         uint64_t timedout; //< requests which timed-out
     } stats;
     void initStats();
 };
 
 class MemBuf;
 class CommTimeoutCbParams;
 
+// TODO: Rename to StatelessHelperServer and rename HelperServerBase to HelperServer.
+/// represents a single "stateless helper" process
 class helper_server : public HelperServerBase
 {
-    CBDATA_CLASS(helper_server);
+    CBDATA_CHILD(helper_server);
 
 public:
     uint64_t nextRequestId;
 
     MemBuf *wqueue;
     MemBuf *writebuf;
 
     helper *parent;
 
     /// The helper request Xaction object for the current reply .
     /// A helper reply may be distributed to more than one of the retrieved
     /// packets from helper. This member stores the Xaction object as long as
     /// the end-of-message for current reply is not retrieved.
     Helper::Xaction *replyXaction;
 
     /// Whether to ignore current message, because it is timed-out or other reason
     bool ignoreToEom;
 
     // STL says storing std::list iterators is safe when changing the list
     typedef std::map<uint64_t, Requests::iterator> RequestIndex;
     RequestIndex requestsIndex; ///< maps request IDs to requests
 
+    virtual ~helper_server();
     /// Search in queue for the request with requestId, return the related
     /// Xaction object and remove it from queue.
     /// If concurrency is disabled then the requestId is ignored and the
     /// Xaction of the next request in queue is retrieved.
     Helper::Xaction *popRequest(int requestId);
 
     /// Run over the active requests lists and forces a retry, or timedout reply
     /// or the configured "on timeout response" for timedout requests.
     void checkForTimedOutRequests(bool const retry);
 
+    /*HelperServerBase API*/
+    virtual bool reserved() override {return false;}
+    virtual void dropQueued() override;
+    virtual helper *getParent() const override {return parent;}
+
     /// Read timeout handler
     static void requestTimeout(const CommTimeoutCbParams &io);
+
+    /// close handler to handle exited server processes
+    static void HelperServerClosed(helper_server *srv);
 };
 
+// TODO: Rename to StatefulHelperServer and rename HelperServerBase to HelperServer.
+/// represents a single "stateful helper" process
 class helper_stateful_server : public HelperServerBase
 {
-    CBDATA_CLASS(helper_stateful_server);
+    CBDATA_CHILD(helper_stateful_server);
 
 public:
+    virtual ~helper_stateful_server();
+    void reserve();
+    void clearReservation();
+
+    /* HelperServerBase API */
+    virtual bool reserved() override {return reservationId.reserved();}
+    virtual helper *getParent() const override {return parent;}
+
+    /// close handler to handle exited server processes
+    static void HelperServerClosed(helper_stateful_server *srv);
+
     statefulhelper *parent;
+
+    // Reservations temporary lock the server for an exclusive "client" use. The
+    // client keeps the reservation ID as a proof of her reservation. If a
+    // reservation expires, and the server is reserved for another client, then
+    // the reservation ID presented by the late client will not match ours.
+    Helper::ReservationId reservationId; ///< "confirmation ID" of the last
+    time_t reservationStart; ///< when the last `reservation` was made
 };
 
 /* helper.c */
 void helperOpenServers(helper * hlp);
 void helperStatefulOpenServers(statefulhelper * hlp);
 void helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data);
-void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver);
+void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, uint64_t reservation);
 void helperShutdown(helper * hlp);
 void helperStatefulShutdown(statefulhelper * hlp);
-void helperStatefulReleaseServer(helper_stateful_server * srv);
 
 #endif /* SQUID_HELPER_H */
 

=== modified file 'src/helper/ChildConfig.cc'
--- src/helper/ChildConfig.cc	2017-02-23 10:29:11 +0000
+++ src/helper/ChildConfig.cc	2017-07-20 08:21:18 +0000
@@ -96,41 +96,43 @@
             if (n_idle < 1) {
                 debugs(0, DBG_CRITICAL, "WARNING OVERIDE: Using idle=0 for helpers causes request failures. Overiding to use idle=1 instead.");
                 n_idle = 1;
             }
         } else if (strncmp(token, "concurrency=", 12) == 0) {
             concurrency = xatoui(token + 12);
         } else if (strncmp(token, "queue-size=", 11) == 0) {
             queue_size = xatoui(token + 11);
             defaultQueueSize = false;
         } else if (strncmp(token, "on-persistent-overload=", 23) == 0) {
             const SBuf action(token + 23);
             if (action.cmp("ERR") == 0)
                 onPersistentOverload = actErr;
             else if (action.cmp("die") == 0)
                 onPersistentOverload = actDie;
             else {
                 debugs(0, DBG_CRITICAL, "ERROR: Unsupported on-persistent-overloaded action: " << action);
                 self_destruct();
                 return;
             }
-        } else {
+        } else if (strncmp(token, "reservation-timeout=", 20) == 0)
+            reservationTimeout = xatoui(token + 20);
+        else {
             debugs(0, DBG_PARSE_NOTE(DBG_IMPORTANT), "ERROR: Undefined option: " << token << ".");
             self_destruct();
             return;
         }
     }
 
     /* simple sanity. */
 
     if (n_startup > n_max) {
         debugs(0, DBG_CRITICAL, "WARNING OVERIDE: Capping startup=" << n_startup << " to the defined maximum (" << n_max <<")");
         n_startup = n_max;
     }
 
     if (n_idle > n_max) {
         debugs(0, DBG_CRITICAL, "WARNING OVERIDE: Capping idle=" << n_idle << " to the defined maximum (" << n_max <<")");
         n_idle = n_max;
     }
 
     if (defaultQueueSize)
         queue_size = 2 * n_max;

=== modified file 'src/helper/ChildConfig.h'
--- src/helper/ChildConfig.h	2017-01-01 00:12:22 +0000
+++ src/helper/ChildConfig.h	2017-07-26 09:03:24 +0000
@@ -87,31 +87,34 @@
 
     /**
      * The requests queue size. By default it is of size 2*n_max
      */
     unsigned int queue_size;
 
     /// how to handle a serious problem with a helper request submission
     enum SubmissionErrorHandlingAction {
         actDie, ///< kill the caller process (i.e., Squid worker)
         actErr  ///< drop the request and send an error to the caller
     };
     /// how to handle a new request for helper that was overloaded for too long
     SubmissionErrorHandlingAction onPersistentOverload;
 
     /**
      * True if the default queue size is used.
      * Needed in the cases where we need to adjust default queue_size in
      * special configurations, for example when redirector_bypass is used.
      */
     bool defaultQueueSize;
+
+    /// older stateful helper server reservations may be forgotten
+    time_t reservationTimeout = 64; // reservation-timeout
 };
 
 } // namespace Helper
 
 /* Legacy parser interface */
 #define parse_HelperChildConfig(c)     (c)->parseConfig()
 #define dump_HelperChildConfig(e,n,c)  storeAppendPrintf((e), "\n%s %d startup=%d idle=%d concurrency=%d\n", (n), (c).n_max, (c).n_startup, (c).n_idle, (c).concurrency)
 #define free_HelperChildConfig(dummy)  // NO.
 
 #endif /* _SQUID_SRC_HELPER_CHILDCONFIG_H */
 

=== modified file 'src/helper/Makefile.am'
--- src/helper/Makefile.am	2017-01-01 00:12:22 +0000
+++ src/helper/Makefile.am	2017-07-18 09:55:33 +0000
@@ -1,22 +1,24 @@
 ## Copyright (C) 1996-2017 The Squid Software Foundation and contributors
 ##
 ## Squid software is distributed under GPLv2+ license and includes
 ## contributions from numerous individuals and organizations.
 ## Please see the COPYING and CONTRIBUTORS files for details.
 ##
 
 include $(top_srcdir)/src/Common.am
 include $(top_srcdir)/src/TestHeaders.am
 
 noinst_LTLIBRARIES = libhelper.la
 
 libhelper_la_SOURCES = \
 	ChildConfig.cc \
 	ChildConfig.h \
 	forward.h \
 	Reply.cc \
 	Reply.h \
 	Request.h \
-	ResultCode.h
+	ResultCode.h \
+	ReservationId.h \
+	ReservationId.cc
 
 EXTRA_DIST= protocol_defines.h

=== modified file 'src/helper/Reply.cc'
--- src/helper/Reply.cc	2017-01-01 00:12:22 +0000
+++ src/helper/Reply.cc	2017-07-18 09:32:41 +0000
@@ -1,41 +1,40 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 84    Helper process maintenance */
 
 #include "squid.h"
 #include "ConfigParser.h"
 #include "Debug.h"
 #include "helper.h"
 #include "helper/Reply.h"
 #include "rfc1738.h"
 #include "SquidString.h"
 
 Helper::Reply::Reply() :
-    result(Helper::Unknown),
-    whichServer(NULL)
+    result(Helper::Unknown)
 {
 }
 
 bool
 Helper::Reply::accumulate(const char *buf, size_t len)
 {
     if (other_.isNull())
         other_.init(4*1024, 1*1024*1024);
 
     if (other_.potentialSpaceSize() < static_cast<mb_size_t>(len))
         return false; // no space left
 
     other_.append(buf, len);
     return true;
 }
 
 void
 Helper::Reply::finalize()
 {
     debugs(84, 3, "Parsing helper buffer");

=== modified file 'src/helper/Reply.h'
--- src/helper/Reply.h	2017-01-01 00:12:22 +0000
+++ src/helper/Reply.h	2017-07-20 08:30:32 +0000
@@ -1,81 +1,81 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_SRC_HELPER_REPLY_H
 #define _SQUID_SRC_HELPER_REPLY_H
 
 #include "base/CbcPointer.h"
 #include "helper/forward.h"
+#include "helper/ReservationId.h"
 #include "helper/ResultCode.h"
 #include "MemBuf.h"
 #include "Notes.h"
 
 #include <ostream>
 
 namespace Helper
 {
 
 /**
  * This object stores the reply message from a helper lookup
  * It provides parser routing to accept a raw buffer and process the
  * helper reply into fields for easy access by callers
  */
 class Reply
 {
 private:
     // copy are prohibited for now
     Reply(const Helper::Reply &r);
     Reply &operator =(const Helper::Reply &r);
 
 public:
-    explicit Reply(Helper::ResultCode res) : result(res), notes(), whichServer(NULL) {}
+    explicit Reply(Helper::ResultCode res) : result(res), notes() {}
 
     /// Creates a NULL reply
     Reply();
 
     const MemBuf &other() const {return other_.isNull() ? emptyBuf() : other_;};
 
     /** parse a helper response line format:
      *   line     := [ result ] *#( kv-pair )
      *   kv-pair := OWS token '=' ( quoted-string | token )
      *
      * token are URL-decoded.
      * quoted-string are \-escape decoded and the quotes are stripped.
      */
     // XXX: buf should be const but we may need strwordtok() and rfc1738_unescape()
     //void parse(char *buf, size_t len);
     void finalize();
 
     bool accumulate(const char *buf, size_t len);
 
 public:
     /// The helper response 'result' field.
     Helper::ResultCode result;
 
     // list of key=value pairs the helper produced
     NotePairs notes;
 
-    /// for stateful replies the responding helper 'server' needs to be preserved across callbacks
-    CbcPointer<helper_stateful_server> whichServer;
-
+    /// The stateful replies should include the reservation ID
+    Helper::ReservationId reservationId;
 private:
     void parseResponseKeys();
 
     /// Return an empty MemBuf.
     const MemBuf &emptyBuf() const;
 
     /// the remainder of the line
     MemBuf other_;
 };
 
 } // namespace Helper
 
 std::ostream &operator <<(std::ostream &os, const Helper::Reply &r);
 
 #endif /* _SQUID_SRC_HELPER_REPLY_H */
 

=== added file 'src/helper/ReservationId.cc'
--- src/helper/ReservationId.cc	1970-01-01 00:00:00 +0000
+++ src/helper/ReservationId.cc	2017-07-21 09:10:25 +0000
@@ -0,0 +1,17 @@
+#include "helper/ReservationId.h"
+
+Helper::ReservationId
+Helper::ReservationId::Next()
+{
+    static uint64_t Ids = 0;
+    Helper::ReservationId reservation;
+    reservation.id = ++Ids;
+    return reservation;
+}
+
+std::ostream &
+Helper::ReservationId::print(std::ostream &os) const
+{
+    return os << "hlpRes" << id;
+}
+

=== added file 'src/helper/ReservationId.h'
--- src/helper/ReservationId.h	1970-01-01 00:00:00 +0000
+++ src/helper/ReservationId.h	2017-07-21 09:13:36 +0000
@@ -0,0 +1,63 @@
+
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef _SQUID_SRC_HELPER_RESERVATIONID_H
+#define _SQUID_SRC_HELPER_RESERVATIONID_H
+
+#include <ostream>
+
+namespace Helper
+{
+/// a (temporary) lock on a (stateful) helper channel
+class ReservationId
+{
+public:
+    static ReservationId Next();
+
+    bool reserved() const { return id > 0; }
+
+    explicit operator bool() const { return reserved(); }
+    bool operator !() const { return !reserved(); }
+    bool operator ==(const Helper::ReservationId &other) const { return id == other.id; }
+    bool operator !=(const Helper::ReservationId &other) const { return !(*this == other); }
+
+    void clear() { id = 0; }
+    uint64_t value() const {return id;}
+
+    /// dumps the reservation info for debugging
+    std::ostream &print(std::ostream &os) const;
+
+private:
+    uint64_t id = 0; ///< uniquely identifies this reservation
+};
+
+}; // namespace Helper
+
+inline std::ostream &
+operator <<(std::ostream &os, const Helper::ReservationId &id)
+{
+    return id.print(os);
+}
+
+namespace std {
+/// default hash functor to support std::unordered_map<HelperReservationId, *>
+template <>
+struct hash<Helper::ReservationId>
+{
+    typedef Helper::ReservationId argument_type;
+    typedef std::size_t result_type;
+    result_type operator()(const argument_type &reservation) const noexcept
+    {
+        std::hash<uint64_t> aHash; 
+        return aHash(reservation.value());
+    }
+};
+}
+
+#endif

=== modified file 'src/tests/stub_helper.cc'
--- src/tests/stub_helper.cc	2017-06-20 22:50:03 +0000
+++ src/tests/stub_helper.cc	2017-06-21 10:28:01 +0000
@@ -1,28 +1,28 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "helper.h"
 
 #define STUB_API "helper.cc"
 #include "tests/STUB.h"
 
 void helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data) STUB
 void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver) STUB
 helper::~helper() STUB
 CBDATA_CLASS_INIT(helper);
 void helper::packStatsInto(Packable *p, const char *label) const STUB
+void helper::openServers() STUB
+void statefulhelper::openServers() STUB
 
 void helperShutdown(helper * hlp) STUB
 void helperStatefulShutdown(statefulhelper * hlp) STUB
-void helperOpenServers(helper * hlp) STUB
-void helperStatefulOpenServers(statefulhelper * hlp) STUB
 helper_stateful_server *helperStatefulDefer(statefulhelper * hlp) STUB_RETVAL(NULL)
 void helperStatefulReleaseServer(helper_stateful_server * srv) STUB
 CBDATA_CLASS_INIT(statefulhelper);
 

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to