Hello,

    The attached patch is the first in a short series of patches that
improve Squid support for the Happy Eyeballs principle. The motivation
for these patches has been discussed on the dns_wait_for_all thread:
http://lists.squid-cache.org/pipermail/squid-dev/2016-September/006831.html

With this change, Squid no longer waits for all request forwarding
destinations to be resolved. Instead, it uses each fully resolved
destination as soon as it is needed. More info is in the patch preamble.

The next planned step is to deliver A and AAAA answers to the peer
selection code independently and ASAP. When both changes are combined,
the FwdState/tunneling code will receive IPvX addresses without waiting
for IPvY addresses (and/or other destination names) to be resolved,
making user eyeballs happier and, hence, facilitating IPv6 deployments.


This patch also adds reporting of peer type when dumping Connection
info. I removed operator "<<" definition from Connection.h header
because we should not burden all Connection users with these low-level
(and performance-unrelated) details. I also removed associated comments
because they were partially misleading (or perhaps outdated) and
partially misplaced (Connection.h is not the right place to state basic
C++ operator overloading rules). If there are no objections, I will
commit this minor improvement separately.

No changes to FwdServer class but its declaration has been moved from
the PeerSelectState.h header to peer_select.cc because FwdServer is
unused by peerSelect() users.

No changes to peerCountMcastPeers functionality. I only documented the
abuse and marked the abused ps_state field with XXX to prevent future
abuses.


Cheers,

Alex.
Happy Eyeballs: Use each fully resolved forwarding destination ASAP.

Do not wait for all request forwarding destinations to be resolved. Use
each resolved destination as soon as it is needed. This change does not
improve or affect IPv4/IPv6 handling (Squid still waits for both DNS A
and AAAA answers when resolving a single destination name), but the peer
selection code can now deliver each IP address to the FwdState/tunneling
code without waiting for all other destination names to be DNS-resolved.
TODO: Deliver A and AAAA answers to the peer selection code ASAP.

This change speeds up forwarding in peering environments where peers may
need frequent DNS resolutions but that was not the motivation here.

Also fixed missing cbdataReferenceDone(psstate->callback_data) calls
in three error handling cases. These cases probably leaked memory.

=== modified file 'src/FwdState.cc'
--- src/FwdState.cc	2017-04-12 23:34:50 +0000
+++ src/FwdState.cc	2017-05-15 21:48:02 +0000
@@ -27,63 +27,61 @@
 #include "fd.h"
 #include "fde.h"
 #include "FwdState.h"
 #include "globals.h"
 #include "gopher.h"
 #include "hier_code.h"
 #include "http.h"
 #include "http/Stream.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "icmp/net_db.h"
 #include "internal.h"
 #include "ip/Intercept.h"
 #include "ip/QosConfig.h"
 #include "ip/tools.h"
 #include "MemObject.h"
 #include "mgr/Registration.h"
 #include "neighbors.h"
 #include "pconn.h"
 #include "PeerPoolMgr.h"
-#include "PeerSelectState.h"
 #include "security/BlindPeerConnector.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 #include "ssl/PeekingPeerConnector.h"
 #include "Store.h"
 #include "StoreClient.h"
 #include "urn.h"
 #include "whois.h"
 #if USE_OPENSSL
 #include "ssl/cert_validate_message.h"
 #include "ssl/Config.h"
 #include "ssl/ErrorDetail.h"
 #include "ssl/helper.h"
 #include "ssl/ServerBump.h"
 #include "ssl/support.h"
 #else
 #include "security/EncryptorAnswer.h"
 #endif
 
 #include <cerrno>
 
-static PSC fwdPeerSelectionCompleteWrapper;
 static CLCB fwdServerClosedWrapper;
 static CNCB fwdConnectDoneWrapper;
 
 static OBJH fwdStats;
 
 #define MAX_FWD_STATS_IDX 9
 static int FwdReplyCodes[MAX_FWD_STATS_IDX + 1][Http::scInvalidHeader + 1];
 
 static PconnPool *fwdPconnPool = new PconnPool("server-peers", NULL);
 CBDATA_CLASS_INIT(FwdState);
 
 class FwdStatePeerAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
 {
 public:
     typedef void (FwdState::*Method)(Security::EncryptorAnswer &);
 
     FwdStatePeerAnswerDialer(Method method, FwdState *fwd):
         method_(method), fwd_(fwd), answer_() {}
 
     /* CallDialer API */
@@ -166,41 +164,41 @@ void FwdState::start(Pointer aSelf)
     // so it registers its own abort handler that calls ours when needed.
     if (!request->flags.ftpNative)
         entry->registerAbort(FwdState::abort, this);
 
 #if STRICT_ORIGINAL_DST
     // Bug 3243: CVE 2009-0801
     // Bypass of browser same-origin access control in intercepted communication
     // To resolve this we must force DIRECT and only to the original client destination.
     const bool isIntercepted = request && !request->flags.redirected && (request->flags.intercepted || request->flags.interceptTproxy);
     const bool useOriginalDst = Config.onoff.client_dst_passthru || (request && !request->flags.hostVerified);
     if (isIntercepted && useOriginalDst) {
         selectPeerForIntercepted();
         // 3.2 does not suppro re-wrapping inside CONNECT.
         // our only alternative is to fake destination "found" and continue with the forwarding.
         startConnectionOrFail();
         return;
     }
 #endif
 
     // do full route options selection
-    peerSelect(&serverDestinations, request, al, entry, fwdPeerSelectionCompleteWrapper, this);
+    startSelectingDestinations(request, al, entry);
 }
 
 #if STRICT_ORIGINAL_DST
 /// bypasses peerSelect() when dealing with intercepted requests
 void
 FwdState::selectPeerForIntercepted()
 {
     // use pinned connection if available
     Comm::ConnectionPointer p;
     if (ConnStateData *client = request->pinnedConnection()) {
         p = client->validatePinnedConnection(request, NULL);
         if (Comm::IsConnOpen(p)) {
             /* duplicate peerSelectPinned() effects */
             p->peerType = PINNED;
             entry->ping_status = PING_DONE;     /* Skip ICP */
 
             debugs(17, 3, "reusing a pinned conn: " << *p);
             serverDestinations.push_back(p);
         } else {
             debugs(17,2, "Pinned connection is not valid: " << p);
@@ -516,51 +514,77 @@ FwdState::complete()
 
         // drop the last path off the selection list. try the next one.
         serverDestinations.erase(serverDestinations.begin());
         startConnectionOrFail();
 
     } else {
         if (Comm::IsConnOpen(serverConn))
             debugs(17, 3, HERE << "server FD " << serverConnection()->fd << " not re-forwarding status " << entry->getReply()->sline.status());
         else
             debugs(17, 3, HERE << "server (FD closed) not re-forwarding status " << entry->getReply()->sline.status());
         EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT);
         entry->complete();
 
         if (!Comm::IsConnOpen(serverConn))
             completed();
 
         self = NULL; // refcounted
     }
 }
 
-/**** CALLBACK WRAPPERS ************************************************************/
+void
+FwdState::noteDestination(Comm::ConnectionPointer path)
+{
+    const bool wasBlocked = serverDestinations.empty();
+    serverDestinations.push_back(path);
+    if (wasBlocked)
+        startConnectionOrFail();
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, we may try this path
+}
 
-static void
-fwdPeerSelectionCompleteWrapper(Comm::ConnectionList *, ErrorState *err, void *data)
+void
+FwdState::noteDestinationsEnd(ErrorState *selectionError)
 {
-    FwdState *fwd = (FwdState *) data;
-    if (err)
-        fwd->fail(err);
-    fwd->startConnectionOrFail();
+    PeerSelectionInitiator::subscribed = false;
+    if (const bool wasBlocked = serverDestinations.empty()) {
+
+        if (selectionError) {
+            debugs(17, 3, "Will abort forwarding because path selection has failed.");
+            Must(!err); // if we tried to connect, then path selection succeeded
+            fail(selectionError);
+        }
+        else if (err)
+            debugs(17, 3, "Will aborting forwarding because all found paths have failed.");
+        else
+            debugs(17, 3, "Will abort forwarding because path selection found no paths.");
+
+        startConnectionOrFail(); // will choose the OrFail code path
+        return;
+    }
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, forwarding as whole will fail
+    Must(!selectionError); // finding at least one path means selection succeeded
 }
 
+/**** CALLBACK WRAPPERS ************************************************************/
+
 static void
 fwdServerClosedWrapper(const CommCloseCbParams &params)
 {
     FwdState *fwd = (FwdState *)params.data;
     fwd->serverClosed(params.fd);
 }
 
 void
 fwdConnectDoneWrapper(const Comm::ConnectionPointer &conn, Comm::Flag status, int xerrno, void *data)
 {
     FwdState *fwd = (FwdState *) data;
     fwd->connectDone(conn, status, xerrno);
 }
 
 /**** PRIVATE *****************************************************************/
 
 /*
  * FwdState::checkRetry
  *
  * Return TRUE if the request SHOULD be retried.  This method is

=== modified file 'src/FwdState.h'
--- src/FwdState.h	2017-04-12 23:34:50 +0000
+++ src/FwdState.h	2017-05-12 15:40:21 +0000
@@ -1,84 +1,85 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_FORWARD_H
 #define SQUID_FORWARD_H
 
 #include "base/RefCount.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "err_type.h"
 #include "fde.h"
 #include "http/StatusCode.h"
 #include "ip/Address.h"
+#include "PeerSelectState.h"
 #include "security/forward.h"
 #if USE_OPENSSL
 #include "ssl/support.h"
 #endif
 
 /* forward decls */
 
 class AccessLogEntry;
 typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
 class PconnPool;
 typedef RefCount<PconnPool> PconnPoolPointer;
 class ErrorState;
 class HttpRequest;
 
 #if USE_OPENSSL
 namespace Ssl
 {
 class ErrorDetail;
 class CertValidationResponse;
 };
 #endif
 
 /**
  * Returns the TOS value that we should be setting on the connection
  * to the server, based on the ACL.
  */
 tos_t GetTosToServer(HttpRequest * request);
 
 /**
  * Returns the Netfilter mark value that we should be setting on the
  * connection to the server, based on the ACL.
  */
 nfmark_t GetNfmarkToServer(HttpRequest * request);
 
 /// Sets initial TOS value and Netfilter for the future outgoing connection.
 void GetMarkingsToServer(HttpRequest * request, Comm::Connection &conn);
 
 class HelperReply;
 
-class FwdState : public RefCountable
+class FwdState: public RefCountable, public PeerSelectionInitiator
 {
-    CBDATA_CLASS(FwdState);
+    CBDATA_CHILD(FwdState);
 
 public:
     typedef RefCount<FwdState> Pointer;
-    ~FwdState();
+    virtual ~FwdState();
     static void initModule();
 
     /// Initiates request forwarding to a peer or origin server.
     static void Start(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp);
     /// Same as Start() but no master xaction info (AccessLogEntry) available.
     static void fwdStart(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *);
     /// time left to finish the whole forwarding process (which started at fwdStart)
     static time_t ForwardTimeout(const time_t fwdStart);
     /// Whether there is still time to re-try after a previous connection failure.
     /// \param fwdStart The start time of the peer selection/connection process.
     static bool EnoughTimeToReForward(const time_t fwdStart);
 
     /// This is the real beginning of server connection. Call it whenever
     /// the forwarding server destination has changed and a new one needs to be opened.
     /// Produces the cannot-forward error on fail if no better error exists.
     void startConnectionOrFail();
 
     void fail(ErrorState *err);
     void unregister(Comm::ConnectionPointer &conn);
     void unregister(int fd);
@@ -93,40 +94,44 @@ public:
     bool checkRetry();
     bool checkRetriable();
     void dispatch();
     /// Pops a connection from connection pool if available. If not
     /// checks the peer stand-by connection pool for available connection.
     Comm::ConnectionPointer pconnPop(const Comm::ConnectionPointer &dest, const char *domain);
     void pconnPush(Comm::ConnectionPointer & conn, const char *domain);
 
     bool dontRetry() { return flags.dont_retry; }
 
     void dontRetry(bool val) { flags.dont_retry = val; }
 
     /** return a ConnectionPointer to the current server connection (may or may not be open) */
     Comm::ConnectionPointer const & serverConnection() const { return serverConn; };
 
 private:
     // hidden for safer management of self; use static fwdStart
     FwdState(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp);
     void start(Pointer aSelf);
 
+    /* PeerSelectionInitiator API */
+    virtual void noteDestination(Comm::ConnectionPointer conn) override;
+    virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+
 #if STRICT_ORIGINAL_DST
     void selectPeerForIntercepted();
 #endif
     static void logReplyStatus(int tries, const Http::StatusCode status);
     void doneWithRetries();
     void completed();
     void retryOrBail();
     ErrorState *makeConnectingError(const err_type type) const;
     void connectedToPeer(Security::EncryptorAnswer &answer);
     static void RegisterWithCacheManager(void);
 
     /// stops monitoring server connection for closure and updates pconn stats
     void closeServerConnection(const char *reason);
 
     void syncWithServerConn(const char *host);
     void syncHierNote(const Comm::ConnectionPointer &server, const char *host);
 
 public:
     StoreEntry *entry;
     HttpRequest *request;

=== modified file 'src/Makefile.am'
--- src/Makefile.am	2017-02-19 17:13:27 +0000
+++ src/Makefile.am	2017-05-16 00:05:30 +0000
@@ -980,41 +980,42 @@ tests_testHttpReply_SOURCES=\
 	tests/stub_libmgr.cc \
 	tests/stub_libsecurity.cc \
 	tests/stub_libsslsquid.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
 	tests/stub_StatHist.cc \
 	repl_modules.h \
 	tests/stub_store.cc \
 	tests/stub_store_stats.cc \
 	tools.h \
 	tests/stub_tools.cc \
 	tests/stub_HttpRequest.cc \
 	tests/testHttpReply.cc \
 	tests/testHttpReply.h \
 	tests/stub_time.cc \
 	url.cc \
 	wordlist.h \
 	wordlist.cc
 nodist_tests_testHttpReply_SOURCES=\
-	$(TESTSOURCES)
+	$(TESTSOURCES) \
+	hier_code.cc
 tests_testHttpReply_LDFLAGS = $(LIBADD_DL)
 tests_testHttpReply_LDADD=\
 	CommCalls.o \
 	http/libhttp.la \
 	parser/libparser.la \
 	acl/libacls.la \
 	acl/libapi.la \
 	acl/libstate.la \
 	anyp/libanyp.la \
 	ip/libip.la \
 	base/libbase.la \
 	ipc/libipc.la \
 	mem/libmem.la \
 	sbuf/libsbuf.la \
 	$(top_builddir)/lib/libmisccontainers.la \
 	$(top_builddir)/lib/libmiscencoding.la \
 	$(top_builddir)/lib/libmiscutil.la \
 	$(NETTLELIB) \
 	$(SSLLIB) \
 	$(LIBCPPUNIT_LIBS) \
@@ -1536,40 +1537,41 @@ tests_testDiskIO_SOURCES = \
 	tests/stub_stat.cc \
 	tests/stub_store_client.cc \
 	tests/stub_store_stats.cc \
 	store_rebuild.h \
 	tests/stub_store_rebuild.cc \
 	tests/stub_UdsOp.cc \
 	tests/testDiskIO.cc \
 	tests/testDiskIO.h \
 	tests/testStoreSupport.cc \
 	tests/testStoreSupport.h \
 	tests/stub_time.cc \
 	$(UNLINKDSOURCE) \
 	url.cc \
 	$(WIN32_SOURCE) \
 	wordlist.h \
 	wordlist.cc \
 	tools.h \
 	tests/stub_tools.cc
 nodist_tests_testDiskIO_SOURCES= \
 	$(TESTSOURCES) \
+	hier_code.cc \
 	SquidMath.cc \
 	SquidMath.h \
 	swap_log_op.cc
 tests_testDiskIO_LDADD = \
 	libsquid.la \
 	http/libhttp.la \
 	parser/libparser.la \
 	SquidConfig.o \
 	CommCalls.o \
 	ident/libident.la \
 	acl/libacls.la \
 	acl/libstate.la \
 	comm/libcomm.la \
 	ip/libip.la \
 	fs/libfs.la \
 	ipc/libipc.la \
 	$(REPL_OBJS) \
 	$(ADAPTATION_LIBS) \
 	DiskIO/libdiskio.la \
 	acl/libapi.la \
@@ -2997,40 +2999,41 @@ tests_testUfs_SOURCES = \
 	StrList.cc \
 	HttpHdrRange.cc \
 	ETag.cc \
 	tests/stub_errorpage.cc \
 	tests/stub_HttpRequest.cc \
 	log/access_log.h \
 	tests/stub_access_log.cc \
 	refresh.h \
 	refresh.cc \
 	tests/stub_store_client.cc \
 	tools.h \
 	tests/stub_tools.cc \
 	tests/testStoreSupport.cc \
 	tests/testStoreSupport.h \
 	time.cc \
 	wordlist.h \
 	wordlist.cc
 
 nodist_tests_testUfs_SOURCES = \
 	$(TESTSOURCES) \
+	hier_code.cc \
 	SquidMath.cc \
 	SquidMath.h \
 	swap_log_op.cc
 tests_testUfs_LDADD = \
 	http/libhttp.la \
 	parser/libparser.la \
 	CommCalls.o \
 	ident/libident.la \
 	acl/libacls.la \
 	acl/libstate.la \
 	acl/libapi.la \
 	libsquid.la \
 	ip/libip.la \
 	fs/libfs.la \
 	mgr/libmgr.la \
 	$(REPL_OBJS) \
 	acl/libacls.la \
 	DiskIO/libdiskio.la \
 	acl/libapi.la \
 	anyp/libanyp.la \
@@ -3176,44 +3179,45 @@ tests_testRock_SOURCES = \
 	tests/stub_libsecurity.cc \
 	tests/stub_MemStore.cc \
 	mime.h \
 	tests/stub_mime.cc \
 	tests/stub_neighbors.cc \
 	tests/stub_Port.cc \
 	tests/stub_pconn.cc \
 	tests/stub_store_client.cc \
 	store_rebuild.h \
 	tests/stub_store_rebuild.cc \
 	tests/stub_store_stats.cc \
 	tools.h \
 	tests/stub_tools.cc \
 	time.cc \
 	url.cc \
 	wordlist.h \
 	wordlist.cc \
 	$(DELAY_POOL_SOURCE) \
 	$(UNLINKDSOURCE)
 nodist_tests_testRock_SOURCES = \
-	swap_log_op.cc \
+	$(TESTSOURCES) \
+	hier_code.cc \
 	SquidMath.cc \
 	SquidMath.h \
-	$(TESTSOURCES)
+	swap_log_op.cc
 tests_testRock_LDADD = \
 	http/libhttp.la \
 	parser/libparser.la \
 	libsquid.la \
 	comm/libcomm.la \
 	ip/libip.la \
 	fs/libfs.la \
 	$(COMMON_LIBS) \
 	$(REPL_OBJS) \
 	DiskIO/libdiskio.la \
 	acl/libacls.la \
 	acl/libapi.la \
 	acl/libstate.la \
 	anyp/libanyp.la \
 	eui/libeui.la \
 	$(SSL_LIBS) \
 	ipc/libipc.la \
 	base/libbase.la \
 	mem/libmem.la \
 	store/libstore.la \

=== modified file 'src/PeerSelectState.h'
--- src/PeerSelectState.h	2017-01-01 00:12:22 +0000
+++ src/PeerSelectState.h	2017-05-15 21:14:48 +0000
@@ -5,98 +5,111 @@
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef   SQUID_PEERSELECTSTATE_H
 #define   SQUID_PEERSELECTSTATE_H
 
 #include "AccessLogEntry.h"
 #include "acl/Checklist.h"
 #include "base/CbcPointer.h"
 #include "comm/forward.h"
 #include "hier_code.h"
 #include "ip/Address.h"
 #include "mem/forward.h"
 #include "PingData.h"
 
 class HttpRequest;
 class StoreEntry;
 class ErrorState;
 
-typedef void PSC(Comm::ConnectionList *, ErrorState *, void *);
-
-void peerSelect(Comm::ConnectionList *, HttpRequest *, AccessLogEntry::Pointer const&, StoreEntry *, PSC *, void *data);
 void peerSelectInit(void);
 
-/**
- * A CachePeer which has been selected as a possible destination.
- * Listed as pointers here so as to prevent duplicates being added but will
- * be converted to a set of IP address path options before handing back out
- * to the caller.
- *
- * Certain connection flags and outgoing settings will also be looked up and
- * set based on the received request and CachePeer settings before handing back.
- */
-class FwdServer
+/// Interface for those who need a list of peers to forward a request to.
+class PeerSelectionInitiator: public CbdataParent
 {
-    MEMPROXY_CLASS(FwdServer);
-
 public:
-    FwdServer(CachePeer *p, hier_code c) :
-        _peer(p),
-        code(c),
-        next(nullptr)
-    {}
-
-    CbcPointer<CachePeer> _peer;                /* NULL --> origin server */
-    hier_code code;
-    FwdServer *next;
+    virtual ~PeerSelectionInitiator() = default;
+
+    /// called when a new unique destination has been found
+    virtual void noteDestination(Comm::ConnectionPointer path) = 0;
+
+    /// called when there will be no more noteDestination() calls
+    /// \param error is a possible reason why no destinations were found; it is
+    /// guaranteed to be nil if there was at least one noteDestination() call
+    virtual void noteDestinationsEnd(ErrorState *error) = 0;
+
+    /// whether noteDestination() and noteDestinationsEnd() calls are allowed
+    bool subscribed = false;
+
+/* protected: */
+    /// Initiates asynchronous peer selection that eventually
+    /// results in zero or more noteDestination() calls and
+    /// exactly one noteDestinationsEnd() call.
+    void startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry);
 };
 
+class FwdServer;
+
 class ps_state
 {
     CBDATA_CLASS(ps_state);
 
 public:
-    ps_state();
+    explicit ps_state(PeerSelectionInitiator *initiator);
     ~ps_state();
 
     // Produce a URL for display identifying the transaction we are
     // trying to locate a peer for.
     const SBuf url() const;
 
+    /// \returns valid/interested peer initiator or nil
+    PeerSelectionInitiator *interestedInitiator();
+
+    /// \returns whether the initiator may use more destinations
+    bool wantsMoreDestinations() const;
+
+    /// processes a newly discovered/finalized path
+    void handlePath(Comm::ConnectionPointer &path, FwdServer &fs);
+
     HttpRequest *request;
     AccessLogEntry::Pointer al; ///< info for the future access.log entry
     StoreEntry *entry;
     allow_t always_direct;
     allow_t never_direct;
     int direct;   // TODO: fold always_direct/never_direct/prefer_direct into this now that ACL can do a multi-state result.
-    PSC *callback;
-    void *callback_data;
+    size_t foundPaths = 0; ///< number of unique destinations identified so far
+    void *peerCountMcastPeerXXX = nullptr; ///< a hack to help peerCountMcastPeersStart()
     ErrorState *lastError;
 
-    Comm::ConnectionList *paths;    ///< the callers paths array. to be filled with our final results.
     FwdServer *servers;    ///< temporary linked list of peers we will pass back.
 
     /*
      * Why are these Ip::Address instead of CachePeer *?  Because a
      * CachePeer structure can become invalid during the CachePeer selection
      * phase, specifically after a reconfigure.  Thus we need to lookup
      * the CachePeer * based on the address when we are finally ready to
      * reference the CachePeer structure.
      */
 
     Ip::Address first_parent_miss;
 
     Ip::Address closest_parent_miss;
     /*
      * ->hit can be CachePeer* because it should only be
      * accessed during the thread when it is set
      */
     CachePeer *hit;
     peer_t hit_type;
     ping_data ping;
     ACLChecklist *acl_checklist;
+
+    const InstanceId<ps_state> id; ///< unique identification in worker log
+
+private:
+
+    typedef CbcPointer<PeerSelectionInitiator> Initiator;
+    Initiator initiator_; ///< recipient of the destinations we select; use interestedInitiator() to access
 };
 
 #endif /* SQUID_PEERSELECTSTATE_H */
 

=== modified file 'src/comm/Connection.cc'
--- src/comm/Connection.cc	2017-04-12 23:34:50 +0000
+++ src/comm/Connection.cc	2017-05-13 18:50:54 +0000
@@ -1,39 +1,40 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "CachePeer.h"
 #include "cbdata.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "fde.h"
 #include "FwdState.h"
 #include "neighbors.h"
 #include "security/NegotiationHistory.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
+#include <ostream>
 
 class CachePeer;
 bool
 Comm::IsConnOpen(const Comm::ConnectionPointer &conn)
 {
     return conn != NULL && conn->isOpen();
 }
 
 Comm::Connection::Connection() :
     peerType(HIER_NONE),
     fd(-1),
     tos(0),
     nfmark(0),
     flags(COMM_NONBLOCKING),
     peer_(nullptr),
     startTime_(squid_curtime),
     tlsHistory(nullptr)
 {
     *rfc931 = 0; // quick init the head. the rest does not matter.
 }
@@ -135,20 +136,36 @@ Comm::Connection::tlsNegotiations()
 time_t
 Comm::Connection::connectTimeout(const time_t fwdStart) const
 {
     // a connection opening timeout (ignoring forwarding time limits for now)
     const CachePeer *peer = getPeer();
     const time_t ctimeout = peer ? peerConnectTimeout(peer) : Config.Timeout.connect;
 
     // time we have left to finish the whole forwarding process
     const time_t fwdTimeLeft = FwdState::ForwardTimeout(fwdStart);
 
     // The caller decided to connect. If there is no time left, to protect
     // connecting code from trying to establish a connection while a zero (i.e.,
     // "immediate") timeout notification is firing, ensure a positive timeout.
     // XXX: This hack gives some timed-out forwarding sequences more time than
     // some sequences that have not quite reached the forwarding timeout yet!
     const time_t ftimeout = fwdTimeLeft ? fwdTimeLeft : 5; // seconds
 
     return min(ctimeout, ftimeout);
 }
 
+std::ostream &
+operator << (std::ostream &os, const Comm::Connection &conn)
+{
+    os << "local=" << conn.local << " remote=" << conn.remote;
+    if (conn.peerType)
+        os << ' ' << hier_code_str[conn.peerType];
+    if (conn.fd >= 0)
+        os << " FD " << conn.fd;
+    if (conn.flags != COMM_UNSET)
+        os << " flags=" << conn.flags;
+#if USE_IDENT
+    if (*conn.rfc931)
+        os << " IDENT::" << conn.rfc931;
+#endif
+    return os;
+}

=== modified file 'src/comm/Connection.h'
--- src/comm/Connection.h	2017-04-12 23:34:50 +0000
+++ src/comm/Connection.h	2017-05-15 19:01:17 +0000
@@ -155,49 +155,32 @@ public:
     char rfc931[USER_IDENT_SZ];
 
 #if USE_SQUID_EUI
     Eui::Eui48 remoteEui48;
     Eui::Eui64 remoteEui64;
 #endif
 
 private:
     /** cache_peer data object (if any) */
     CachePeer *peer_;
 
     /** The time the connection object was created */
     time_t startTime_;
 
     /** TLS connection details*/
     Security::NegotiationHistory *tlsHistory;
 };
 
 }; // namespace Comm
 
-// NP: Order and namespace here is very important.
-//     * The second define inlines the first.
-//     * Stream inheritance overloading is searched in the global scope first.
-
-inline std::ostream &
-operator << (std::ostream &os, const Comm::Connection &conn)
-{
-    os << "local=" << conn.local << " remote=" << conn.remote;
-    if (conn.fd >= 0)
-        os << " FD " << conn.fd;
-    if (conn.flags != COMM_UNSET)
-        os << " flags=" << conn.flags;
-#if USE_IDENT
-    if (*conn.rfc931)
-        os << " IDENT::" << conn.rfc931;
-#endif
-    return os;
-}
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn);
 
 inline std::ostream &
 operator << (std::ostream &os, const Comm::ConnectionPointer &conn)
 {
     if (conn != NULL)
         os << *conn;
     return os;
 }
 
 #endif
 

=== modified file 'src/neighbors.cc'
--- src/neighbors.cc	2017-05-05 19:42:51 +0000
+++ src/neighbors.cc	2017-05-12 23:05:31 +0000
@@ -1339,97 +1339,98 @@ peerProbeConnectDone(const Comm::Connect
     // TODO: log this traffic.
 }
 
 static void
 peerCountMcastPeersSchedule(CachePeer * p, time_t when)
 {
     if (p->mcast.flags.count_event_pending)
         return;
 
     eventAdd("peerCountMcastPeersStart",
              peerCountMcastPeersStart,
              p,
              (double) when, 1);
 
     p->mcast.flags.count_event_pending = true;
 }
 
 static void
 peerCountMcastPeersStart(void *data)
 {
+    // XXX: Do not create lots of complex fake objects (while abusing their
+    // APIs) to pass around a few basic data points like start_ping and ping!
     CachePeer *p = (CachePeer *)data;
     ps_state *psstate;
     StoreEntry *fake;
     MemObject *mem;
     icp_common_t *query;
     int reqnum;
     LOCAL_ARRAY(char, url, MAX_URL);
     assert(p->type == PEER_MULTICAST);
     p->mcast.flags.count_event_pending = false;
     snprintf(url, MAX_URL, "http://";);
     p->in_addr.toUrl(url+7, MAX_URL -8 );
     strcat(url, "/");
     fake = storeCreateEntry(url, url, RequestFlags(), Http::METHOD_GET);
     HttpRequest *req = HttpRequest::CreateFromUrl(url);
-    psstate = new ps_state;
+    psstate = new ps_state(nullptr);
     psstate->request = req;
     HTTPMSGLOCK(psstate->request);
     psstate->entry = fake;
-    psstate->callback = NULL;
-    psstate->callback_data = cbdataReference(p);
+    psstate->peerCountMcastPeerXXX = cbdataReference(p);
     psstate->ping.start = current_time;
     mem = fake->mem_obj;
     mem->request = psstate->request;
     mem->start_ping = current_time;
     mem->ping_reply_callback = peerCountHandleIcpReply;
     mem->ircb_data = psstate;
     mcastSetTtl(icpOutgoingConn->fd, p->mcast.ttl);
     p->mcast.id = mem->id;
     reqnum = icpSetCacheKey((const cache_key *)fake->key);
     query = _icp_common_t::createMessage(ICP_QUERY, 0, url, reqnum, 0);
     icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0);
     fake->ping_status = PING_WAITING;
     eventAdd("peerCountMcastPeersDone",
              peerCountMcastPeersDone,
              psstate,
              Config.Timeout.mcast_icp_query / 1000.0, 1);
     p->mcast.flags.counting = true;
     peerCountMcastPeersSchedule(p, MCAST_COUNT_RATE);
 }
 
 static void
 peerCountMcastPeersDone(void *data)
 {
     ps_state *psstate = (ps_state *)data;
     StoreEntry *fake = psstate->entry;
 
-    if (cbdataReferenceValid(psstate->callback_data)) {
-        CachePeer *p = (CachePeer *)psstate->callback_data;
+    if (cbdataReferenceValid(psstate->peerCountMcastPeerXXX)) {
+        CachePeer *p = (CachePeer *)psstate->peerCountMcastPeerXXX;
         p->mcast.flags.counting = false;
         p->mcast.avg_n_members = Math::doubleAverage(p->mcast.avg_n_members, (double) psstate->ping.n_recv, ++p->mcast.n_times_counted, 10);
         debugs(15, DBG_IMPORTANT, "Group " << p->host  << ": " << psstate->ping.n_recv  <<
                " replies, "<< std::setw(4)<< std::setprecision(2) <<
                p->mcast.avg_n_members <<" average, RTT " << p->stats.rtt);
         p->mcast.n_replies_expected = (int) p->mcast.avg_n_members;
     }
 
-    cbdataReferenceDone(psstate->callback_data);
+    cbdataReferenceDone(psstate->peerCountMcastPeerXXX);
 
     fake->abort(); // sets ENTRY_ABORTED and initiates releated cleanup
     fake->mem_obj->request = nullptr;
     fake->unlock("peerCountMcastPeersDone");
     delete psstate;
 }
 
 static void
 peerCountHandleIcpReply(CachePeer * p, peer_t, AnyP::ProtocolType proto, void *, void *data)
 {
     ps_state *psstate = (ps_state *)data;
     StoreEntry *fake = psstate->entry;
     assert(fake);
     MemObject *mem = fake->mem_obj;
     assert(mem);
     int rtt = tvSubMsec(mem->start_ping, current_time);
     assert(proto == AnyP::PROTO_ICP);
     ++ psstate->ping.n_recv;
     int rtt_av_factor = RTT_AV_FACTOR;
 

=== modified file 'src/peer_select.cc'
--- src/peer_select.cc	2017-01-01 00:12:22 +0000
+++ src/peer_select.cc	2017-05-15 23:18:09 +0000
@@ -1,57 +1,83 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 44    Peer Selection Algorithm */
 
 #include "squid.h"
 #include "acl/FilledChecklist.h"
+#include "base/InstanceId.h"
 #include "CachePeer.h"
 #include "carp.h"
 #include "client_side.h"
 #include "dns/LookupDetails.h"
 #include "errorpage.h"
 #include "event.h"
 #include "FwdState.h"
 #include "globals.h"
 #include "hier_code.h"
 #include "htcp.h"
 #include "http/Stream.h"
 #include "HttpRequest.h"
 #include "icmp/net_db.h"
 #include "ICP.h"
 #include "ip/tools.h"
 #include "ipcache.h"
 #include "neighbors.h"
 #include "peer_sourcehash.h"
 #include "peer_userhash.h"
 #include "PeerSelectState.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 #include "Store.h"
 #include "URL.h"
 
+/**
+ * A CachePeer which has been selected as a possible destination.
+ * Listed as pointers here so as to prevent duplicates being added but will
+ * be converted to a set of IP address path options before handing back out
+ * to the caller.
+ *
+ * Certain connection flags and outgoing settings will also be looked up and
+ * set based on the received request and CachePeer settings before handing back.
+ */
+class FwdServer
+{
+    MEMPROXY_CLASS(FwdServer);
+
+public:
+    FwdServer(CachePeer *p, hier_code c) :
+        _peer(p),
+        code(c),
+        next(nullptr)
+    {}
+
+    CbcPointer<CachePeer> _peer;                /* NULL --> origin server */
+    hier_code code;
+    FwdServer *next;
+};
+
 static struct {
     int timeouts;
 } PeerStats;
 
 static const char *DirectStr[] = {
     "DIRECT_UNKNOWN",
     "DIRECT_NO",
     "DIRECT_MAYBE",
     "DIRECT_YES"
 };
 
 static void peerSelectFoo(ps_state *);
 static void peerPingTimeout(void *data);
 static IRCB peerHandlePingReply;
 static void peerIcpParentMiss(CachePeer *, icp_common_t *, ps_state *);
 #if USE_HTCP
 static void peerHtcpParentMiss(CachePeer *, HtcpReplyData *, ps_state *);
 static void peerHandleHtcpReply(CachePeer *, peer_t, HtcpReplyData *, void *);
 #endif
 static int peerCheckNetdbDirect(ps_state * psstate);
@@ -105,80 +131,79 @@ peerSelectIcpPing(HttpRequest * request,
     int n;
     assert(entry);
     assert(entry->ping_status == PING_NONE);
     assert(direct != DIRECT_YES);
     debugs(44, 3, "peerSelectIcpPing: " << entry->url());
 
     if (!request->flags.hierarchical && direct != DIRECT_NO)
         return 0;
 
     if (EBIT_TEST(entry->flags, KEY_PRIVATE) && !neighbors_do_private_keys)
         if (direct != DIRECT_NO)
             return 0;
 
     n = neighborsCount(request);
 
     debugs(44, 3, "peerSelectIcpPing: counted " << n << " neighbors");
 
     return n;
 }
 
-void
-peerSelect(Comm::ConnectionList * paths,
+static void
+peerSelect(PeerSelectionInitiator *initiator,
            HttpRequest * request,
            AccessLogEntry::Pointer const &al,
-           StoreEntry * entry,
-           PSC * callback,
-           void *callback_data)
+           StoreEntry * entry)
 {
-    ps_state *psstate;
-
     if (entry)
         debugs(44, 3, *entry << ' ' << entry->url());
     else
         debugs(44, 3, request->method);
 
-    psstate = new ps_state;
+    const auto psstate = new ps_state(initiator);
 
     psstate->request = request;
     HTTPMSGLOCK(psstate->request);
     psstate->al = al;
 
     psstate->entry = entry;
-    psstate->paths = paths;
-
-    psstate->callback = callback;
-
-    psstate->callback_data = cbdataReference(callback_data);
 
 #if USE_CACHE_DIGESTS
 
     request->hier.peer_select_start = current_time;
 
 #endif
 
     if (psstate->entry)
         psstate->entry->lock("peerSelect");
 
     peerSelectFoo(psstate);
 }
 
+void
+PeerSelectionInitiator::startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry)
+{
+    subscribed = true;
+    peerSelect(this, request, ale, entry);
+    // and wait for noteDestination() and/or noteDestinationsEnd() calls
+}
+
 static void
 peerCheckNeverDirectDone(allow_t answer, void *data)
 {
     ps_state *psstate = (ps_state *) data;
     psstate->acl_checklist = NULL;
     debugs(44, 3, "peerCheckNeverDirectDone: " << answer);
     psstate->never_direct = answer;
     switch (answer) {
     case ACCESS_ALLOWED:
         /** if never_direct says YES, do that. */
         psstate->direct = DIRECT_NO;
         debugs(44, 3, HERE << "direct = " << DirectStr[psstate->direct] << " (never_direct allow)");
         break;
     case ACCESS_DENIED: // not relevant.
     case ACCESS_DUNNO:  // not relevant.
         break;
     case ACCESS_AUTH_REQUIRED:
         debugs(44, DBG_IMPORTANT, "WARNING: never_direct resulted in " << answer << ". Username ACLs are not reliable here.");
         break;
     }
@@ -191,189 +216,176 @@ peerCheckAlwaysDirectDone(allow_t answer
     ps_state *psstate = (ps_state *)data;
     psstate->acl_checklist = NULL;
     debugs(44, 3, "peerCheckAlwaysDirectDone: " << answer);
     psstate->always_direct = answer;
     switch (answer) {
     case ACCESS_ALLOWED:
         /** if always_direct says YES, do that. */
         psstate->direct = DIRECT_YES;
         debugs(44, 3, HERE << "direct = " << DirectStr[psstate->direct] << " (always_direct allow)");
         break;
     case ACCESS_DENIED: // not relevant.
     case ACCESS_DUNNO:  // not relevant.
         break;
     case ACCESS_AUTH_REQUIRED:
         debugs(44, DBG_IMPORTANT, "WARNING: always_direct resulted in " << answer << ". Username ACLs are not reliable here.");
         break;
     }
     peerSelectFoo(psstate);
 }
 
+/// \returns true (after destroying psstate) if the peer initiator is gone
+/// \returns false (without side effects) otherwise
+static bool
+peerSelectionAborted(ps_state *psstate)
+{
+    if (psstate->interestedInitiator())
+        return false;
+
+    debugs(44, 3, "Aborting peer selection: Initiator gone or lost interest.");
+    delete psstate;
+    return true;
+}
+
 void
 peerSelectDnsPaths(ps_state *psstate)
 {
-    FwdServer *fs = psstate->servers;
-
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete psstate;
+    if (peerSelectionAborted(psstate))
         return;
-    }
+
+    FwdServer *fs = psstate->servers;
 
     // Bug 3243: CVE 2009-0801
     // Bypass of browser same-origin access control in intercepted communication
     // To resolve this we must use only the original client destination when going DIRECT
     // on intercepted traffic which failed Host verification
     const HttpRequest *req = psstate->request;
     const bool isIntercepted = !req->flags.redirected &&
                                (req->flags.intercepted || req->flags.interceptTproxy);
     const bool useOriginalDst = Config.onoff.client_dst_passthru || !req->flags.hostVerified;
     const bool choseDirect = fs && fs->code == HIER_DIRECT;
     if (isIntercepted && useOriginalDst && choseDirect) {
         // check the client is still around before using any of its details
         if (req->clientConnectionManager.valid()) {
             // construct a "result" adding the ORIGINAL_DST to the set instead of DIRECT
             Comm::ConnectionPointer p = new Comm::Connection();
             p->remote = req->clientConnectionManager->clientConnection->local;
-            p->peerType = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
-            p->setPeer(fs->_peer.get());
-
-            // check for a configured outgoing address for this destination...
-            getOutgoingAddress(psstate->request, p);
-            psstate->paths->push_back(p);
+            fs->code = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
+            psstate->handlePath(p, *fs);
         }
 
         // clear the used fs and continue
         psstate->servers = fs->next;
         delete fs;
         peerSelectDnsPaths(psstate);
         return;
     }
 
     // convert the list of FwdServer destinations into destinations IP addresses
-    if (fs && psstate->paths->size() < (unsigned int)Config.forward_max_tries) {
+    if (fs && psstate->wantsMoreDestinations()) {
         // send the next one off for DNS lookup.
         const char *host = fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host();
         debugs(44, 2, "Find IP destination for: " << psstate->url() << "' via " << host);
         ipcache_nbgethostbyname(host, peerSelectDnsResults, psstate);
         return;
     }
 
     // Bug 3605: clear any extra listed FwdServer destinations, when the options exceeds max_foward_tries.
     // due to the allocation method of fs, we must deallocate each manually.
     // TODO: use a std::list so we can get the size and abort adding whenever the selection loops reach Config.forward_max_tries
-    if (fs && psstate->paths->size() >= (unsigned int)Config.forward_max_tries) {
+    if (fs) {
         assert(fs == psstate->servers);
         while (fs) {
             psstate->servers = fs->next;
             delete fs;
             fs = psstate->servers;
         }
     }
 
     // done with DNS lookups. pass back to caller
-    PSC *callback = psstate->callback;
-    psstate->callback = NULL;
 
-    debugs(44, 2, (psstate->paths->size()<1?"Failed to select source":"Found sources") << " for '" << psstate->url() << "'");
+    debugs(44, 2, psstate->id << " found all " << psstate->foundPaths << " destinations for " << psstate->url());
     debugs(44, 2, "  always_direct = " << psstate->always_direct);
     debugs(44, 2, "   never_direct = " << psstate->never_direct);
-    if (psstate->paths) {
-        for (size_t i = 0; i < psstate->paths->size(); ++i) {
-            if ((*psstate->paths)[i]->peerType == HIER_DIRECT)
-                debugs(44, 2, "         DIRECT = " << (*psstate->paths)[i]);
-            else if ((*psstate->paths)[i]->peerType == ORIGINAL_DST)
-                debugs(44, 2, "   ORIGINAL_DST = " << (*psstate->paths)[i]);
-            else if ((*psstate->paths)[i]->peerType == PINNED)
-                debugs(44, 2, "         PINNED = " << (*psstate->paths)[i]);
-            else
-                debugs(44, 2, "     cache_peer = " << (*psstate->paths)[i]);
-        }
-    }
     debugs(44, 2, "       timedout = " << psstate->ping.timedout);
 
     psstate->ping.stop = current_time;
-    psstate->request->hier.ping = psstate->ping;
+    psstate->request->hier.ping = psstate->ping; // final result
 
-    void *cbdata;
-    if (cbdataReferenceValidDone(psstate->callback_data, &cbdata)) {
-        callback(psstate->paths, psstate->lastError, cbdata);
-        psstate->lastError = NULL; // FwdState has taken control over the ErrorState object.
+    if (psstate->lastError && psstate->foundPaths) {
+        // nobody cares about errors if we found destinations despite them
+        debugs(44, 3, "forgetting the last error");
+        delete psstate->lastError;
+        psstate->lastError = nullptr;
     }
 
+    if (const auto initiator = psstate->interestedInitiator())
+        initiator->noteDestinationsEnd(psstate->lastError);
+    psstate->lastError = NULL; // initiator owns the ErrorState object now
     delete psstate;
 }
 
 static void
 peerSelectDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &details, void *data)
 {
     ps_state *psstate = (ps_state *)data;
-
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete psstate;
+    if (peerSelectionAborted(psstate))
         return;
-    }
 
     psstate->request->recordLookup(details);
 
     FwdServer *fs = psstate->servers;
     if (ia != NULL) {
 
         assert(ia->cur < ia->count);
 
         // loop over each result address, adding to the possible destinations.
         int ip = ia->cur;
         for (int n = 0; n < ia->count; ++n, ++ip) {
             Comm::ConnectionPointer p;
 
             if (ip >= ia->count) ip = 0; // looped back to zero.
 
-            // Enforce forward_max_tries configuration.
-            if (psstate->paths->size() >= (unsigned int)Config.forward_max_tries)
+            if (!psstate->wantsMoreDestinations())
                 break;
 
             // for TPROXY spoofing we must skip unusable addresses.
             if (psstate->request->flags.spoofClientIp && !(fs->_peer.valid() && fs->_peer->options.no_tproxy) ) {
                 if (ia->in_addrs[ip].isIPv4() != psstate->request->client_addr.isIPv4()) {
                     // we CAN'T spoof the address on this link. find another.
                     continue;
                 }
             }
 
             p = new Comm::Connection();
             p->remote = ia->in_addrs[ip];
 
             // when IPv6 is disabled we cannot use it
             if (!Ip::EnableIpv6 && p->remote.isIPv6()) {
                 const char *host = (fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host());
                 ipcacheMarkBadAddr(host, p->remote);
                 continue;
             }
 
             p->remote.port(fs->_peer.valid() ? fs->_peer->http_port : psstate->request->url.port());
-            p->peerType = fs->code;
-            p->setPeer(fs->_peer.get());
 
-            // check for a configured outgoing address for this destination...
-            getOutgoingAddress(psstate->request, p);
-            psstate->paths->push_back(p);
+            psstate->handlePath(p, *fs);
         }
     } else {
         debugs(44, 3, "Unknown host: " << (fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host()));
         // discard any previous error.
         delete psstate->lastError;
         psstate->lastError = NULL;
         if (fs->code == HIER_DIRECT) {
             psstate->lastError = new ErrorState(ERR_DNS_FAIL, Http::scServiceUnavailable, psstate->request);
             psstate->lastError->dnsError = details.error;
         }
     }
 
     psstate->servers = fs->next;
     delete fs;
 
     // see if more paths can be found
     peerSelectDnsPaths(psstate);
 }
 
 static int
@@ -405,45 +417,42 @@ peerCheckNetdbDirect(ps_state * psstate)
         return 1;
 
     p = whichPeer(psstate->closest_parent_miss);
 
     if (p == NULL)
         return 0;
 
     debugs(44, 3, "peerCheckNetdbDirect: closest_parent_miss RTT = " << psstate->ping.p_rtt << " msec");
 
     if (myrtt && myrtt <= psstate->ping.p_rtt)
         return 1;
 
 #endif /* USE_ICMP */
 
     return 0;
 }
 
 static void
 peerSelectFoo(ps_state * ps)
 {
-    if (!cbdataReferenceValid(ps->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete ps;
+    if (peerSelectionAborted(ps))
         return;
-    }
 
     StoreEntry *entry = ps->entry;
     HttpRequest *request = ps->request;
     debugs(44, 3, request->method << ' ' << request->url.host());
 
     /** If we don't know whether DIRECT is permitted ... */
     if (ps->direct == DIRECT_UNKNOWN) {
         if (ps->always_direct == ACCESS_DUNNO) {
             debugs(44, 3, "peerSelectFoo: direct = " << DirectStr[ps->direct] << " (always_direct to be checked)");
             /** check always_direct; */
             ACLFilledChecklist *ch = new ACLFilledChecklist(Config.accessList.AlwaysDirect, request, NULL);
             ch->al = ps->al;
             ps->acl_checklist = ch;
             ps->acl_checklist->nonBlockingCheck(peerCheckAlwaysDirectDone, ps);
             return;
         } else if (ps->never_direct == ACCESS_DUNNO) {
             debugs(44, 3, "peerSelectFoo: direct = " << DirectStr[ps->direct] << " (never_direct to be checked)");
             /** check never_direct; */
             ACLFilledChecklist *ch = new ACLFilledChecklist(Config.accessList.NeverDirect, request, NULL);
             ch->al = ps->al;
@@ -725,53 +734,47 @@ peerGetAllParents(ps_state * ps)
 
         debugs(15, 3, "peerGetAllParents: adding alive parent " << p->host);
 
         peerAddFwdServer(&ps->servers, p, ANY_OLD_PARENT);
     }
 
     /* XXX: should add dead parents here, but it is currently
      * not possible to find out which parents are dead or which
      * simply are not configured to handle the request.
      */
     /* Add default parent as a last resort */
     if ((p = getDefaultParent(request))) {
         peerAddFwdServer(&ps->servers, p, DEFAULT_PARENT);
     }
 }
 
 static void
 peerPingTimeout(void *data)
 {
     ps_state *psstate = (ps_state *)data;
-    StoreEntry *entry = psstate->entry;
+    debugs(44, 3, psstate->url());
 
-    if (entry)
-        debugs(44, 3, psstate->url());
+    if (StoreEntry *entry = psstate->entry)
+        entry->ping_status = PING_DONE;
 
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        /* request aborted */
-        if (entry)
-            entry->ping_status = PING_DONE;
-        cbdataReferenceDone(psstate->callback_data);
-        delete psstate;
+    if (peerSelectionAborted(psstate))
         return;
-    }
 
     ++PeerStats.timeouts;
     psstate->ping.timedout = 1;
     peerSelectFoo(psstate);
 }
 
 void
 peerSelectInit(void)
 {
     memset(&PeerStats, '\0', sizeof(PeerStats));
 }
 
 static void
 peerIcpParentMiss(CachePeer * p, icp_common_t * header, ps_state * ps)
 {
     int rtt;
 
 #if USE_ICMP
     if (Config.onoff.query_icmp) {
         if (header->flags & ICP_FLAG_SRC_RTT) {
@@ -920,67 +923,116 @@ peerHandlePingReply(CachePeer * p, peer_
 #endif
 
     else
         debugs(44, DBG_IMPORTANT, "peerHandlePingReply: unknown protocol " << proto);
 }
 
 static void
 peerAddFwdServer(FwdServer ** FSVR, CachePeer * p, hier_code code)
 {
     debugs(44, 5, "peerAddFwdServer: adding " <<
            (p ? p->host : "DIRECT")  << " " <<
            hier_code_str[code]  );
     FwdServer *fs = new FwdServer(p, code);
 
     while (*FSVR)
         FSVR = &(*FSVR)->next;
 
     *FSVR = fs;
 }
 
-ps_state::ps_state() : request (NULL),
+ps_state::ps_state(PeerSelectionInitiator *initiator):
+    request(nullptr),
     entry (NULL),
     always_direct(Config.accessList.AlwaysDirect?ACCESS_DUNNO:ACCESS_DENIED),
     never_direct(Config.accessList.NeverDirect?ACCESS_DUNNO:ACCESS_DENIED),
     direct(DIRECT_UNKNOWN),
-    callback (NULL),
-    callback_data (NULL),
     lastError(NULL),
-    paths(NULL),
     servers (NULL),
     first_parent_miss(),
     closest_parent_miss(),
     hit(NULL),
     hit_type(PEER_NONE),
-    acl_checklist (NULL)
+    acl_checklist (NULL),
+    initiator_(initiator)
 {
     ; // no local defaults.
 }
 
 const SBuf
 ps_state::url() const
 {
     if (entry)
         return SBuf(entry->url());
 
     if (request)
         return request->effectiveRequestUri();
 
     static const SBuf noUrl("[no URL]");
     return noUrl;
 }
 
+/// \returns valid/interested peer initiator or nil
+PeerSelectionInitiator *
+ps_state::interestedInitiator()
+{
+    const auto initiator = initiator_.valid();
+
+    if (!initiator) {
+        debugs(44, 3, id << " initiator gone");
+        return nullptr;
+    }
+
+    if (!initiator->subscribed) {
+        debugs(44, 3, id << " initiator lost interest");
+        return nullptr;
+    }
+
+    return initiator;
+}
+
+bool
+ps_state::wantsMoreDestinations() const {
+    const auto maxCount = Config.forward_max_tries;
+    return maxCount >= 0 && foundPaths <
+        static_cast<std::make_unsigned<decltype(maxCount)>::type>(maxCount);
+}
+
+void
+ps_state::handlePath(Comm::ConnectionPointer &path, FwdServer &fs)
+{
+    ++foundPaths;
+
+    path->peerType = fs.code;
+    path->setPeer(fs._peer.get());
+
+    // check for a configured outgoing address for this destination...
+    getOutgoingAddress(request, path);
+
+    request->hier.ping = ping; // may be updated later
+
+    debugs(44, 2, id << " found " << path << ", destination #" << foundPaths << " for " << url());
+    debugs(44, 2, "  always_direct = " << always_direct);
+    debugs(44, 2, "   never_direct = " << never_direct);
+    debugs(44, 2, "       timedout = " << ping.timedout);
+
+    if (const auto initiator = interestedInitiator())
+        initiator->noteDestination(path);
+}
+
+InstanceIdDefinitions(ps_state, "PeerSelector");
+
 ping_data::ping_data() :
     n_sent(0),
     n_recv(0),
     n_replies_expected(0),
     timeout(0),
     timedout(0),
     w_rtt(0),
     p_rtt(0)
 {
     start.tv_sec = 0;
     start.tv_usec = 0;
     stop.tv_sec = 0;
     stop.tv_usec = 0;
 }
 

=== modified file 'src/tests/stub_comm.cc'
--- src/tests/stub_comm.cc	2017-01-01 00:12:22 +0000
+++ src/tests/stub_comm.cc	2017-05-15 23:27:22 +0000
@@ -1,36 +1,38 @@
 /*
  * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "comm/Connection.h"
 #include "comm/Loops.h"
 #include "fde.h"
 
 #define STUB_API "comm.cc"
 #include "tests/STUB.h"
 
+#include <ostream>
+
 void comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data) STUB
 void comm_read(const Comm::ConnectionPointer &conn, char*, int, AsyncCall::Pointer &callback) STUB
 
 /* should be in stub_CommRead */
 #include "CommRead.h"
 CommRead::CommRead(const Comm::ConnectionPointer &, char *, int, AsyncCall::Pointer &) STUB
 CommRead::CommRead() STUB
 DeferredReadManager::~DeferredReadManager() STUB
 DeferredRead::DeferredRead(DeferrableRead *, void *, CommRead const &) STUB
 void DeferredReadManager::delayRead(DeferredRead const &aRead) STUB
 void DeferredReadManager::kickReads(int const count) STUB
 
 #include "comm.h"
 bool comm_iocallbackpending(void) STUB_RETVAL(false)
 int commSetNonBlocking(int fd) STUB_RETVAL(Comm::COMM_ERROR)
 int commUnsetNonBlocking(int fd) STUB_RETVAL(-1)
 void commSetCloseOnExec(int fd) STUB_NOP
 void commSetTcpKeepalive(int fd, int idle, int interval, int timeout) STUB
 void _comm_close(int fd, char const *file, int line) STUB
 void old_comm_reset_close(int fd) STUB
@@ -52,20 +54,21 @@ int comm_udp_sendto(int sock, const Ip::
 void commCallCloseHandlers(int fd) STUB
 void commUnsetFdTimeout(int fd) STUB
 int commSetTimeout(const Comm::ConnectionPointer &, int, AsyncCall::Pointer&) STUB_RETVAL(-1)
 int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback) STUB_RETVAL(-1)
 int commUnsetConnTimeout(const Comm::ConnectionPointer &conn) STUB_RETVAL(-1)
 int ignoreErrno(int ierrno) STUB_RETVAL(-1)
 void commCloseAllSockets(void) STUB
 void checkTimeouts(void) STUB
 AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *, void *) STUB
 void comm_add_close_handler(int fd, AsyncCall::Pointer &) STUB
 void comm_remove_close_handler(int fd, CLCB *, void *) STUB
 void comm_remove_close_handler(int fd, AsyncCall::Pointer &)STUB
 int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from) STUB_RETVAL(-1)
 int comm_udp_recv(int fd, void *buf, size_t len, int flags) STUB_RETVAL(-1)
 ssize_t comm_udp_send(int s, const void *buf, size_t len, int flags) STUB_RETVAL(-1)
 bool comm_has_incomplete_write(int) STUB_RETVAL(false)
 void commStartHalfClosedMonitor(int fd) STUB
 bool commHasHalfClosedMonitor(int fd) STUB_RETVAL(false)
 int CommSelectEngine::checkEvents(int timeout) STUB_RETVAL(0)
 
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn) STUB_RETVAL(os << "[Connection object]")

=== modified file 'src/tunnel.cc'
--- src/tunnel.cc	2017-04-12 23:34:50 +0000
+++ src/tunnel.cc	2017-05-15 23:17:20 +0000
@@ -46,47 +46,47 @@
 #if USE_DELAY_POOLS
 #include "DelayId.h"
 #endif
 
 #include <climits>
 #include <cerrno>
 
 /**
  * TunnelStateData is the state engine performing the tasks for
  * setup of a TCP tunnel from an existing open client FD to a server
  * then shuffling binary data between the resulting FD pair.
  */
 /*
  * TODO 1: implement a read/write API on ConnStateData to send/receive blocks
  * of pre-formatted data. Then we can use that as the client side of the tunnel
  * instead of re-implementing it here and occasionally getting the ConnStateData
  * read/write state wrong.
  *
  * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
  */
-class TunnelStateData
+class TunnelStateData: public PeerSelectionInitiator
 {
-    CBDATA_CLASS(TunnelStateData);
+    CBDATA_CHILD(TunnelStateData);
 
 public:
     TunnelStateData(ClientHttpRequest *);
-    ~TunnelStateData();
+    virtual ~TunnelStateData();
     TunnelStateData(const TunnelStateData &); // do not implement
     TunnelStateData &operator =(const TunnelStateData &); // do not implement
 
     class Connection;
     static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data);
     static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data);
     static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data);
     static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data);
 
     /// Starts reading peer response to our CONNECT request.
     void readConnectResponse();
 
     /// Called when we may be done handling a CONNECT exchange with the peer.
     void connectExchangeCheckpoint();
 
     bool noConnections() const;
     char *url;
     CbcPointer<ClientHttpRequest> http;
     HttpRequest::Pointer request;
     AccessLogEntryPointer al;
@@ -109,40 +109,42 @@ public:
         // have already responded to that CONNECT before tunnel.cc started.
         if (request && request->flags.forceTunnel)
             return false;
 #if USE_OPENSSL
         // We are bumping and we had already send "OK CONNECTED"
         if (http.valid() && http->getConn() && http->getConn()->serverBump() && http->getConn()->serverBump()->step > Ssl::bumpStep1)
             return false;
 #endif
         return !(request != NULL &&
                  (request->flags.interceptTproxy || request->flags.intercepted));
     }
 
     /// Sends "502 Bad Gateway" error response to the client,
     /// if it is waiting for Squid CONNECT response, closing connections.
     void informUserOfPeerError(const char *errMsg, size_t);
 
     /// starts connecting to the next hop, either for the first time or while
     /// recovering from the previous connect failure
     void startConnecting();
 
+    void noteConnectFailure(const Comm::ConnectionPointer &conn);
+
     class Connection
     {
 
     public:
         Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL), delayedLoops(0),
             readPending(NULL), readPendingFunc(NULL) {}
 
         ~Connection();
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
         void bytesIn(int const &);
 #if USE_DELAY_POOLS
 
         void setDelayId(DelayId const &);
 #endif
 
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
         void closeIfOpen();
         void dataSent (size_t amount);
@@ -164,92 +166,101 @@ public:
 
         DelayId delayId;
 #endif
 
     };
 
     Connection client, server;
     int *status_ptr;        ///< pointer for logging HTTP status
     LogTags *logTag_ptr;    ///< pointer for logging Squid processing code
     MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
     bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
     SBuf preReadClientData;
     SBuf preReadServerData;
     time_t startTime; ///< object creation time, before any peer selection/connection attempts
 
     void copyRead(Connection &from, IOCB *completion);
 
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
+    /* PeerSelectionInitiator API */
+    virtual void noteDestination(Comm::ConnectionPointer conn) override;
+    virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+
+    void saveError(ErrorState *finalError);
+    void sendError(ErrorState *finalError, const char *reason);
+
 private:
     /// Gives Security::PeerConnector access to Answer in the TunnelStateData callback dialer.
     class MyAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
     {
     public:
         typedef void (TunnelStateData::*Method)(Security::EncryptorAnswer &);
 
         MyAnswerDialer(Method method, TunnelStateData *tunnel):
             method_(method), tunnel_(tunnel), answer_() {}
 
         /* CallDialer API */
         virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); }
         void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); }
         virtual void print(std::ostream &os) const {
             os << '(' << tunnel_.get() << ", " << answer_ << ')';
         }
 
         /* Security::PeerConnector::CbDialer API */
         virtual Security::EncryptorAnswer &answer() { return answer_; }
 
     private:
         Method method_;
         CbcPointer<TunnelStateData> tunnel_;
         Security::EncryptorAnswer answer_;
     };
 
     /// callback handler after connection setup (including any encryption)
     void connectedToPeer(Security::EncryptorAnswer &answer);
 
+    /// details of the "last tunneling attempt" failure (if it failed)
+    ErrorState *savedError = nullptr;
+
 public:
     bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
     void copy(size_t len, Connection &from, Connection &to, IOCB *);
     void handleConnectResponse(const size_t chunkSize);
     void readServer(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void readClient(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void writeClientDone(char *buf, size_t len, Comm::Flag flag, int xerrno);
     void writeServerDone(char *buf, size_t len, Comm::Flag flag, int xerrno);
 
     static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data);
     void readConnectResponseDone(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void copyClientBytes();
     void copyServerBytes();
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
 
 static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
-static PSC tunnelPeerSelectComplete;
 static EVH tunnelDelayedClientRead;
 static EVH tunnelDelayedServerRead;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
     tunnelState->server.writer = NULL;
 
     if (tunnelState->request != NULL)
         tunnelState->request->hier.stopPeerClock(false);
 
     if (tunnelState->noConnections()) {
         // ConnStateData pipeline should contain the CONNECT we are performing
         // but it may be invalid already (bug 4392)
         if (tunnelState->http.valid() && tunnelState->http->getConn()) {
@@ -310,40 +321,41 @@ TunnelStateData::TunnelStateData(ClientH
     status_ptr = &clientRequest->al->http.code;
     logTag_ptr = &clientRequest->logType;
     al = clientRequest->al;
     http = clientRequest;
 
     client.conn = clientRequest->getConn()->clientConnection;
     comm_add_close_handler(client.conn->fd, tunnelClientClosed, this);
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, this));
     commSetConnTimeout(client.conn, Config.Timeout.lifetime, timeoutCall);
 }
 
 TunnelStateData::~TunnelStateData()
 {
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
     serverDestinations.clear();
     delete connectRespBuf;
+    delete savedError;
 }
 
 TunnelStateData::Connection::~Connection()
 {
     if (readPending)
         eventDelete(readPendingFunc, readPending);
 
     safe_free(buf);
 }
 
 int
 TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
 {
 #if USE_DELAY_POOLS
     return delayId.bytesWanted(lowerbound, upperbound);
 #else
 
     return upperbound;
 #endif
 }
@@ -430,48 +442,43 @@ TunnelStateData::readConnectResponseDone
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
         handleConnectResponse(len);
 }
 
 void
 TunnelStateData::informUserOfPeerError(const char *errMsg, const size_t sz)
 {
     server.len = 0;
 
     if (logTag_ptr)
         *logTag_ptr = LOG_TCP_TUNNEL;
 
     if (!clientExpectsConnectResponse()) {
         // closing the connection is the best we can do here
         debugs(50, 3, server.conn << " closing on error: " << errMsg);
         server.conn->close();
         return;
     }
 
     // if we have no reply suitable to relay, use 502 Bad Gateway
-    if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize())) {
-        ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw());
-        *status_ptr = Http::scBadGateway;
-        err->callback = tunnelErrorComplete;
-        err->callback_data = this;
-        errorSend(http->getConn()->clientConnection, err);
-        return;
-    }
+    if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize()))
+        return sendError(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw()),
+                         "peer error without reply");
 
     // if we need to send back the server response. write its headers to the client
     server.len = sz;
     memcpy(server.buf, connectRespBuf->content(), server.len);
     copy(server.len, server, client, TunnelStateData::WriteClientDone);
     // then close the server FD to prevent any relayed keep-alive causing CVE-2015-5400
     server.closeIfOpen();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
 }
 
 /// Parses [possibly incomplete] CONNECT response and reacts to it.
@@ -964,75 +971,81 @@ tunnelConnected(const Comm::ConnectionPo
         tunnelState->client.write(conn_established, strlen(conn_established), call, NULL);
     }
 }
 
 static void
 tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_t)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << "FD " << fd);
     assert(tunnelState != NULL);
     /* temporary lock to save our own feets (comm_close -> tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     if (Comm::IsConnOpen(tunnelState->client.conn))
         tunnelState->client.conn->close();
 
     if (Comm::IsConnOpen(tunnelState->server.conn))
         tunnelState->server.conn->close();
 }
 
+/// reacts to a failure to establish the given TCP connection
+void
+TunnelStateData::noteConnectFailure(const Comm::ConnectionPointer &conn)
+{
+    debugs(26, 4, "removing the failed one from " << serverDestinations.size() <<
+           " destinations: " << conn);
+
+    if (CachePeer *peer = conn->getPeer())
+        peerConnectFailed(peer);
+
+    assert(!serverDestinations.empty());
+    serverDestinations.erase(serverDestinations.begin());
+
+    // Since no TCP payload has been passed to client or server, we may
+    // TCP-connect to other destinations (including alternate IPs).
+
+    if (!FwdState::EnoughTimeToReForward(startTime))
+        return sendError(savedError, "forwarding timeout");
+
+    if (!serverDestinations.empty())
+        return startConnecting();
+
+    if (!PeerSelectionInitiator::subscribed)
+        return sendError(savedError, "tried all destinations");
+
+    debugs(26, 4, "wait for more destinations to try");
+    // expect a noteDestination*() call
+}
+
 static void
 tunnelConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
 
     if (status != Comm::OK) {
-        debugs(26, 4, HERE << conn << ", comm failure recovery.");
-        {
-            assert(!tunnelState->serverDestinations.empty());
-            const Comm::Connection &failedDest = *tunnelState->serverDestinations.front();
-            if (CachePeer *peer = failedDest.getPeer())
-                peerConnectFailed(peer);
-            debugs(26, 4, "removing the failed one from " << tunnelState->serverDestinations.size() <<
-                   " destinations: " << failedDest);
-        }
-        /* At this point only the TCP handshake has failed. no data has been passed.
-         * we are allowed to re-try the TCP-level connection to alternate IPs for CONNECT.
-         */
-        tunnelState->serverDestinations.erase(tunnelState->serverDestinations.begin());
-        if (!tunnelState->serverDestinations.empty() && FwdState::EnoughTimeToReForward(tunnelState->startTime)) {
-            debugs(26, 4, "re-forwarding");
-            tunnelState->startConnecting();
-        } else {
-            debugs(26, 4, HERE << "terminate with error.");
-            ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
-            *tunnelState->status_ptr = Http::scServiceUnavailable;
-            err->xerrno = xerrno;
-            // on timeout is this still:    err->xerrno = ETIMEDOUT;
-            err->port = conn->remote.port();
-            err->callback = tunnelErrorComplete;
-            err->callback_data = tunnelState;
-            errorSend(tunnelState->client.conn, err);
-            if (tunnelState->request != NULL)
-                tunnelState->request->hier.stopPeerClock(false);
-        }
+        ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
+        err->xerrno = xerrno;
+        // on timeout is this still:    err->xerrno = ETIMEDOUT;
+        err->port = conn->remote.port();
+        tunnelState->saveError(err);
+        tunnelState->noteConnectFailure(conn);
         return;
     }
 
 #if USE_DELAY_POOLS
     /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */
     if (conn->getPeer() && conn->getPeer()->options.no_delay)
         tunnelState->server.setDelayId(DelayId());
 #endif
 
     tunnelState->request->hier.note(conn, tunnelState->getHost());
 
     tunnelState->server.conn = conn;
     tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host : NULL;
     comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState);
 
     debugs(26, 4, HERE << "determine post-connect handling pathway.");
     if (conn->getPeer()) {
         tunnelState->request->peer_login = conn->getPeer()->login;
         tunnelState->request->peer_domain = conn->getPeer()->domain;
         tunnelState->request->flags.auth_no_keytab = conn->getPeer()->options.auth_no_keytab;
@@ -1079,74 +1092,67 @@ tunnelStart(ClientHttpRequest * http)
         ACLFilledChecklist ch(Config.accessList.miss, request, NULL);
         ch.src_addr = request->client_addr;
         ch.my_addr = request->my_addr;
         if (ch.fastCheck() == ACCESS_DENIED) {
             debugs(26, 4, HERE << "MISS access forbidden.");
             err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, request);
             http->al->http.code = Http::scForbidden;
             errorSend(http->getConn()->clientConnection, err);
             return;
         }
     }
 
     debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver);
     ++statCounter.server.all.requests;
     ++statCounter.server.other.requests;
 
     tunnelState = new TunnelStateData(http);
 #if USE_DELAY_POOLS
     //server.setDelayId called from tunnelConnectDone after server side connection established
 #endif
-
-    peerSelect(&(tunnelState->serverDestinations), request, http->al,
-               NULL,
-               tunnelPeerSelectComplete,
-               tunnelState);
+    tunnelState->startSelectingDestinations(request, http->al, nullptr);
 }
 
 void
 TunnelStateData::connectToPeer()
 {
     if (CachePeer *p = server.conn->getPeer()) {
         if (p->secure.encryptTransport) {
             AsyncCall::Pointer callback = asyncCall(5,4,
                                                     "TunnelStateData::ConnectedToPeer",
                                                     MyAnswerDialer(&TunnelStateData::connectedToPeer, this));
             auto *connector = new Security::BlindPeerConnector(request, server.conn, callback, al);
             AsyncJob::Start(connector); // will call our callback
             return;
         }
     }
 
     Security::EncryptorAnswer nil;
     connectedToPeer(nil);
 }
 
 void
 TunnelStateData::connectedToPeer(Security::EncryptorAnswer &answer)
 {
     if (ErrorState *error = answer.error.get()) {
-        *status_ptr = error->httpStatus;
-        error->callback = tunnelErrorComplete;
-        error->callback_data = this;
-        errorSend(client.conn, error);
-        answer.error.clear(); // preserve error for errorSendComplete()
+        answer.error.clear(); // sendError() will own the error
+        sendError(error, "TLS peer connection error");
         return;
     }
 
     tunnelRelayConnectRequest(server.conn, this);
 }
 
 static void
 tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert(!tunnelState->waitingForConnectExchange());
     HttpHeader hdr_out(hoRequest);
     Http::StateFlags flags;
     debugs(26, 3, HERE << srv << ", tunnelState=" << tunnelState);
     memset(&flags, '\0', sizeof(flags));
     flags.proxying = tunnelState->request->flags.proxying;
     MemBuf mb;
     mb.init();
     mb.appendf("CONNECT %s HTTP/1.1\r\n", tunnelState->url);
     HttpStateData::httpBuildRequestHeader(tunnelState->request.getRaw(),
@@ -1177,86 +1183,130 @@ tunnelRelayConnectRequest(const Comm::Co
 
     assert(tunnelState->waitingForConnectExchange());
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
     commSetConnTimeout(srv, Config.Timeout.read, timeoutCall);
 }
 
 static Comm::ConnectionPointer
 borrowPinnedConnection(HttpRequest *request, Comm::ConnectionPointer &serverDestination)
 {
     // pinned_connection may become nil after a pconn race
     if (ConnStateData *pinned_connection = request ? request->pinnedConnection() : nullptr) {
         Comm::ConnectionPointer serverConn = pinned_connection->borrowPinnedConnection(request, serverDestination->getPeer());
         return serverConn;
     }
 
     return nullptr;
 }
 
-static void
-tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data)
+void
+TunnelStateData::noteDestination(Comm::ConnectionPointer path)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    const bool wasBlocked = serverDestinations.empty();
+    serverDestinations.push_back(path);
+    if (wasBlocked)
+        startConnecting();
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, we may try this path
+}
 
-    bool bail = false;
-    if (!peer_paths || peer_paths->empty()) {
-        debugs(26, 3, HERE << "No paths found. Aborting CONNECT");
-        bail = true;
-    }
+void
+TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
+{
+    PeerSelectionInitiator::subscribed = false;
+    if (const bool wasBlocked = serverDestinations.empty()) {
 
-    if (!bail && tunnelState->serverDestinations[0]->peerType == PINNED) {
-        Comm::ConnectionPointer serverConn = borrowPinnedConnection(tunnelState->request.getRaw(), tunnelState->serverDestinations[0]);
-        debugs(26,7, "pinned peer connection: " << serverConn);
-        if (Comm::IsConnOpen(serverConn)) {
-            tunnelConnectDone(serverConn, Comm::OK, 0, (void *)tunnelState);
-            return;
-        }
-        bail = true;
-    }
+        if (selectionError)
+            return sendError(selectionError, "path selection has failed");
 
-    if (bail) {
-        if (!err) {
-            err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request.getRaw());
-        }
-        *tunnelState->status_ptr = err->httpStatus;
-        err->callback = tunnelErrorComplete;
-        err->callback_data = tunnelState;
-        errorSend(tunnelState->client.conn, err);
-        return;
+        if (savedError)
+            return sendError(savedError, "all found paths have failed");
+
+        return sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+                         "path selection found no paths");
     }
-    delete err;
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, tunneling as whole will fail
+    Must(!selectionError); // finding at least one path means selection succeeded
+}
 
-    if (tunnelState->request != NULL)
-        tunnelState->request->hier.startPeerClock();
+/// remembers an error to be used if there will be no more connection attempts
+void
+TunnelStateData::saveError(ErrorState *error)
+{
+    debugs(26, 4, savedError << " ? " << error);
+    assert(error);
+    delete savedError; // may be nil
+    savedError = error;
+}
+
+/// Starts sending the given error message to the client, leading to the
+/// eventual transaction termination. Call with savedError to send savedError.
+void
+TunnelStateData::sendError(ErrorState *finalError, const char *reason)
+{
+    debugs(26, 3, "aborting transaction for " << reason);
+
+    if (request)
+        request->hier.stopPeerClock(false);
+
+    assert(finalError);
 
-    debugs(26, 3, "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" <<
-           tunnelState->serverDestinations[0] << "}");
+    // get rid of any cached error unless that is what the caller is sending
+    if (savedError != finalError)
+        delete savedError; // may be nil
+    savedError = nullptr;
 
-    tunnelState->startConnecting();
+    // we cannot try other destinations after responding with an error
+    PeerSelectionInitiator::subscribed = false; // may already be false
+
+    *status_ptr = finalError->httpStatus;
+    finalError->callback = tunnelErrorComplete;
+    finalError->callback_data = this;
+    errorSend(client.conn, finalError);
 }
 
 void
 TunnelStateData::startConnecting()
 {
+    if (request)
+        request->hier.startPeerClock();
+
+    assert(!serverDestinations.empty());
     Comm::ConnectionPointer &dest = serverDestinations.front();
+    debugs(26, 3, "to " << dest);
+
+    if (dest->peerType == PINNED) {
+        Comm::ConnectionPointer serverConn = borrowPinnedConnection(request.getRaw(), dest);
+        debugs(26,7, "pinned peer connection: " << serverConn);
+        if (Comm::IsConnOpen(serverConn)) {
+            tunnelConnectDone(serverConn, Comm::OK, 0, (void *)this);
+            return;
+        }
+        // a PINNED path failure is fatal; do not wait for more paths
+        sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+                  "pinned path failure");
+        return;
+    }
+
     GetMarkingsToServer(request.getRaw(), *dest);
 
     const time_t connectTimeout = dest->connectTimeout(startTime);
     AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, this));
     Comm::ConnOpener *cs = new Comm::ConnOpener(dest, call, connectTimeout);
     cs->setHost(url);
     AsyncJob::Start(cs);
 }
 
 CBDATA_CLASS_INIT(TunnelStateData);
 
 bool
 TunnelStateData::noConnections() const
 {
     return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn);
 }
 
 #if USE_DELAY_POOLS
 void
 TunnelStateData::Connection::setDelayId(DelayId const &newDelay)

_______________________________________________
squid-dev mailing list
[email protected]
http://lists.squid-cache.org/listinfo/squid-dev

Reply via email to