Hello,

    When talking to a cache_peer (i.e., sending a CONNECT request before
tunneling the transaction), tunnel code is using a clever hack: Squid
does not parse the CONNECT response from peer but blindly forwards it to
the client. This works great and simplifies code a lot, except when the
client connection was intercepted and, hence, the client did not send a
CONNECT request and is not expecting a CONNECT response.

In those situations, the patch accumulates, parses, and strips the peer
CONNECT response (or closes connection on errors).

The existing tunnel I/O code is too simple to accommodate that task --
it cannot accumulate read data (its I/O buffers work in lockstep
fashion, writing everything it reads before reading again). Instead of
rewriting the entire tunnel code to use more complex buffers, I added a
temporary accumulation buffer for the CONNECT response. That buffer is
not allocated unless it is needed and does not grow beyond
SQUID_TCP_SO_RCVBUF size, just like the simple buffers.


Thank you,

Alex.
Support forwarding intercepted but not bumped connections to cache_peers.

When talking to a cache_peer (i.e., sending a CONNECT request before tunneling
the transaction), tunnel code is using a clever hack: Squid does not parse
the CONNECT response from peer but blindly forwards it to the client. This
works great and simplifies code a lot, except when the client connection
was intercepted and, hence, the client did not send a CONNECT request and
is not expecting a CONNECT response.

In those situations, we now accumulate, parse, and strip the peer CONNECT
response (or close connection on errors). 

The existing tunnel I/O code is too simple to accommodate that task -- it
cannot accumulate read data (its I/O buffers work in lockstep fashion, writing
everything it reads before reading again). Instead of rewriting the entire
tunnel code to use more complex buffers, I added a temporary accumulation
buffer for the CONNECT response. That buffer is not allocated unless it is
needed and does not grow beyond SQUID_TCP_SO_RCVBUF size, just like the
simple buffers.

=== modified file 'src/tunnel.cc'
--- src/tunnel.cc	2013-05-13 03:57:03 +0000
+++ src/tunnel.cc	2013-05-24 22:58:59 +0000
@@ -74,93 +74,120 @@
  * instead of re-implementing it here and occasionally getting the ConnStateData
  * read/write state wrong.
  *
  * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
  */
 class TunnelStateData
 {
 
 public:
     TunnelStateData();
     ~TunnelStateData();
     TunnelStateData(const TunnelStateData &); // do not implement
     TunnelStateData &operator =(const TunnelStateData &); // do not implement
 
     class Connection;
     static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
     static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
     static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
     static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
 
+    /// Starts reading peer response to our CONNECT request.
+    void readConnectResponse();
+
+    /// Called when we may be done handling a CONNECT exchange with the peer.
+    void connectExchangeCheckpoint();
+
     bool noConnections() const;
     char *url;
     HttpRequest *request;
     Comm::ConnectionList serverDestinations;
 
     const char * getHost() const {
         return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost());
     };
 
+    /// Whether we are writing a CONNECT request to a peer.
+    bool waitingForConnectRequest() const { return connectReqWriting; }
+    /// Whether we are reading a CONNECT response from a peer.
+    bool waitingForConnectResponse() const { return connectRespBuf; }
+    /// Whether we are waiting for the CONNECT request/response exchange with the peer.
+    bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); }
+
+    /// Whether the client sent a CONNECT request to us.
+    bool clientExpectsConnectResponse() const { return !(request &&
+        (request->flags.interceptTproxy || request->flags.intercepted)); }
+
     class Connection
     {
 
     public:
         Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {}
 
         ~Connection();
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
         void bytesIn(int const &);
 #if USE_DELAY_POOLS
 
         void setDelayId(DelayId const &);
 #endif
 
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
+        /// handles a non-I/O error associated with this Connection
+        void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
         int len;
         char *buf;
         int64_t *size_ptr;		/* pointer to size in an ConnStateData for logging */
 
         Comm::ConnectionPointer conn;    ///< The currently connected connection.
 
     private:
 #if USE_DELAY_POOLS
 
         DelayId delayId;
 #endif
 
     };
 
     Connection client, server;
     int *status_ptr;		/* pointer to status for logging */
+    MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
+    bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
+
     void copyRead(Connection &from, IOCB *completion);
 
 private:
     CBDATA_CLASS2(TunnelStateData);
-    void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *);
+    bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to);
+    void copy(size_t len, Connection &from, Connection &to, IOCB *);
+    void handleConnectResponse(const size_t chunkSize);
     void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
     void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
     void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
     void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+
+    static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
+    void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno);
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
 
 static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
 
@@ -179,52 +206,55 @@
 tunnelClientClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->client.conn);
     tunnelState->client.conn = NULL;
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
     if (!tunnelState->client.len) {
         tunnelState->server.conn->close();
         return;
     }
 }
 
 TunnelStateData::TunnelStateData() :
         url(NULL),
         request(NULL),
-        status_ptr(NULL)
+        status_ptr(NULL),
+        connectRespBuf(NULL),
+        connectReqWriting(false)
 {
     debugs(26, 3, "TunnelStateData constructed this=" << this);
 }
 
 TunnelStateData::~TunnelStateData()
 {
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
     serverDestinations.clean();
     HTTPMSGUNLOCK(request);
+    delete connectRespBuf;
 }
 
 TunnelStateData::Connection::~Connection()
 {
     safe_free(buf);
 }
 
 int
 TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
 {
 #if USE_DELAY_POOLS
     return delayId.bytesWanted(lowerbound, upperbound);
 #else
 
     return upperbound;
 #endif
 }
 
 void
 TunnelStateData::Connection::bytesIn(int const &count)
@@ -266,41 +296,142 @@
 
 void
 TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
         server.bytesIn(len);
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
-    copy (len, errcode, xerrno, server, client, WriteClientDone);
+    if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+        copy(len, server, client, WriteClientDone);
+}
+
+/// Called when we read [a part of] CONNECT response from the peer
+void
+TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno)
+{
+    debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode);
+    assert(waitingForConnectResponse());
+
+    if (errcode == COMM_ERR_CLOSING)
+        return;
+
+    if (len > 0) {
+        connectRespBuf->appended(len);
+        server.bytesIn(len);
+        kb_incr(&(statCounter.server.all.kbytes_in), len);
+        kb_incr(&(statCounter.server.other.kbytes_in), len);
+    }
+
+    if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+        handleConnectResponse(len);
+}
+
+/* Read from client side and queue it for writing to the server */
+void
+TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+{
+    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    assert (cbdataReferenceValid (tunnelState));
+
+    tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
+}
+
+/// Parses [possibly incomplete] CONNECT response and reacts to it.
+/// If the tunnel is being closed or more response data is needed, returns false.
+/// Otherwise, the caller should handle the remaining read data, if any.
+void
+TunnelStateData::handleConnectResponse(const size_t chunkSize)
+{
+    assert(waitingForConnectResponse());
+
+    // Ideally, client and server should use MemBuf or better, but current code
+    // never accumulates more than one read when shoveling data (XXX) so it does
+    // not need to deal with MemBuf complexity. To keep it simple, we use a 
+    // dedicated MemBuf for accumulating CONNECT responses. TODO: When shoveling
+    // is optimized, reuse server.buf for CONNEC response accumulation instead.
+
+    /* mimic the basic parts of HttpStateData::processReplyHeader() */
+    HttpReply rep;
+    Http::StatusCode parseErr = Http::scNone;
+    const bool eof = !chunkSize;
+    const bool parsed = rep.parse(connectRespBuf, eof, &parseErr);
+    if (!parsed) {
+        if (parseErr > 0) { // unrecoverable parsing error
+            server.logicError("malformed CONNECT response from peer");
+            return;
+        }
+
+        // need more data
+        assert(!eof);
+        assert(!parseErr);
+
+        if (!connectRespBuf->hasSpace()) {
+            server.logicError("huge CONNECT response from peer");
+            return;
+        }
+
+        // keep reading
+        readConnectResponse();
+        return;
+    }
+
+    // CONNECT response was successfully parsed
+    *status_ptr = rep.sline.status();
+
+    // bail if we did not get an HTTP 200 (Connection Established) response
+    if (rep.sline.status() != Http::scOkay) {
+        server.logicError("unsupported CONNECT response status code");
+        return;
+    }
+
+    if (rep.hdr_sz < connectRespBuf->contentSize()) {
+        // preserve bytes that the server already sent after the CONNECT response
+        server.len = connectRespBuf->contentSize() - rep.hdr_sz;
+        memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
+    } else {
+        // reset; delay pools were using this field to throttle CONNECT response
+        server.len = 0;
+    }
+
+    delete connectRespBuf;
+    connectRespBuf = NULL;
+    connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::Connection::logicError(const char *errMsg)
+{
+    debugs(50, 3, conn << " closing on error: " << errMsg);
+    conn->close();
 }
 
 void
 TunnelStateData::Connection::error(int const xerrno)
 {
     /* XXX fixme xstrerror and xerrno... */
     errno = xerrno;
 
     debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror());
 
     if (!ignoreErrno(xerrno))
         conn->close();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
@@ -309,85 +440,96 @@
 }
 
 void
 TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
         client.bytesIn(len);
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
-    copy (len, errcode, xerrno, client, server, WriteServerDone);
+    if (keepGoingAfterRead(len, errcode, xerrno, client, server))
+        copy(len, client, server, WriteServerDone);
 }
 
-void
-TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion)
+/// Updates state after reading from client or server.
+/// Returns whether the caller should use the data just read.
+bool
+TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to)
 {
     debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}");
 
     /* I think this is to prevent free-while-in-a-callback behaviour
      * - RBC 20030229
      * from.conn->close() / to.conn->close() done here trigger close callbacks which may free TunnelStateData
      */
     const CbcPointer<TunnelStateData> safetyLock(this);
 
     /* Bump the source connection read timeout on any activity */
     if (Comm::IsConnOpen(from.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, this));
         commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall);
     }
 
     /* Bump the dest connection read timeout on any activity */
     /* see Bug 3659: tunnels can be weird, with very long one-way transfers */
     if (Comm::IsConnOpen(to.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, this));
         commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall);
     }
 
     if (errcode)
         from.error (xerrno);
     else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
         debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the tunnel.");
         from.conn->close();
 
         /* Only close the remote end if we've finished queueing data to it */
         if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
             to.conn->close();
         }
     } else if (cbdataReferenceValid(this)) {
+        return true;
+    }
+
+    return false;
+}
+
+void
+TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion)
+{
         debugs(26, 3, HERE << "Schedule Write");
         AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
                                              CommIoCbPtrFun(completion, this));
         Comm::Write(to.conn, from.buf, len, call, NULL);
-    }
 }
 
 /* Writes data from the client buffer to the server side */
 void
 TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->writeServerDone(buf, len, flag, xerrno);
 }
 
 void
 TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno)
 {
     debugs(26, 3, HERE  << server.conn << ", " << len << " bytes written, flag=" << flag);
 
     /* Error? */
     if (flag != COMM_OK) {
         if (flag != COMM_ERR_CLOSING) {
@@ -493,84 +635,131 @@
     tunnelState->client.closeIfOpen();
     tunnelState->server.closeIfOpen();
 }
 
 void
 TunnelStateData::Connection::closeIfOpen()
 {
     if (Comm::IsConnOpen(conn))
         conn->close();
 }
 
 void
 TunnelStateData::copyRead(Connection &from, IOCB *completion)
 {
     assert(from.len == 0);
     AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
                                          CommIoCbPtrFun(completion, this));
     comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
 }
 
+void
+TunnelStateData::readConnectResponse()
+{
+    assert(waitingForConnectResponse());
+
+    AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
+                                         CommIoCbPtrFun(ReadConnectResponseDone, this));
+    comm_read(server.conn, connectRespBuf->space(),
+              server.bytesWanted(1, connectRespBuf->spaceSize()), call);
+}
+
 /**
  * Set the HTTP status for this request and sets the read handlers for client
  * and server side connections.
  */
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
+    assert(!tunnelState->waitingForConnectExchange());
     *tunnelState->status_ptr = Http::scOkay;
     if (cbdataReferenceValid(tunnelState)) {
+        if (!tunnelState->server.len)
         tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer);
+        else
+            tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone);
         tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient);
     }
 }
 
 /**
  * All the pieces we need to write to client and/or server connection
  * have been written.
  * Call the tunnelStartShoveling to start the blind pump.
  */
 static void
 tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << conn << ", flag=" << flag);
 
     if (flag != COMM_OK) {
         *tunnelState->status_ptr = Http::scInternalServerError;
         tunnelErrorComplete(conn->fd, data, 0);
         return;
     }
 
     tunnelStartShoveling(tunnelState);
 }
 
+/// Called when we are done writing CONNECT request to a peer.
+static void
+tunnelConnectReqWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+{
+    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    debugs(26, 3, conn << ", flag=" << flag);
+    assert(tunnelState->waitingForConnectRequest());
+
+    if (flag != COMM_OK) {
+        *tunnelState->status_ptr = Http::scInternalServerError;
+        tunnelErrorComplete(conn->fd, data, 0);
+        return;
+    }
+
+    tunnelState->connectReqWriting = false;
+    tunnelState->connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::connectExchangeCheckpoint()
+{
+    if (waitingForConnectResponse()) {
+        debugs(26, 5, "still reading CONNECT response on " << server.conn);
+    } else if (waitingForConnectRequest()) {
+        debugs(26, 5, "still writing CONNECT request on " << server.conn);
+    } else {
+        assert(!waitingForConnectExchange());
+        debugs(26, 3, "done with CONNECT exchange on " << server.conn);
+        tunnelConnected(server.conn, this);
+    }
+}
+
 /*
  * handle the write completion from a proxy request to an upstream origin
  */
 static void
 tunnelConnected(const Comm::ConnectionPointer &server, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
 
-    if (tunnelState->request && (tunnelState->request->flags.interceptTproxy || tunnelState->request->flags.intercepted))
+    if (!tunnelState->clientExpectsConnectResponse())
         tunnelStartShoveling(tunnelState); // ssl-bumped connection, be quiet
     else {
         AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
                                              CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
         Comm::Write(tunnelState->client.conn, conn_established, strlen(conn_established), call, NULL);
     }
 }
 
 static void
 tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_t)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << "FD " << fd);
     assert(tunnelState != NULL);
     /* temporary lock to save our own feets (comm_close -> tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     if (Comm::IsConnOpen(tunnelState->client.conn))
         tunnelState->client.conn->close();
 
@@ -703,63 +892,84 @@
     tunnelState->client.conn = http->getConn()->clientConnection;
 
     comm_add_close_handler(tunnelState->client.conn->fd,
                            tunnelClientClosed,
                            tunnelState);
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
     commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall);
 
     peerSelect(&(tunnelState->serverDestinations), request,
                NULL,
                tunnelPeerSelectComplete,
                tunnelState);
 }
 
 static void
 tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
+    assert(!tunnelState->waitingForConnectExchange());
     HttpHeader hdr_out(hoRequest);
     Packer p;
     HttpStateFlags flags;
     debugs(26, 3, HERE << srv << ", tunnelState=" << tunnelState);
     memset(&flags, '\0', sizeof(flags));
     flags.proxying = tunnelState->request->flags.proxying;
     MemBuf mb;
     mb.init();
     mb.Printf("CONNECT %s HTTP/1.1\r\n", tunnelState->url);
     HttpStateData::httpBuildRequestHeader(tunnelState->request,
                                           NULL,			/* StoreEntry */
                                           NULL,			/* AccessLogEntry */
                                           &hdr_out,
                                           flags);			/* flags */
     packerToMemInit(&p, &mb);
     hdr_out.packInto(&p);
     hdr_out.clean();
     packerClean(&p);
     mb.append("\r\n", 2);
 
+    if (tunnelState->clientExpectsConnectResponse()) {
+    // hack: blindly tunnel peer response (to our CONNECT request) to the client as ours.
     AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectedWriteDone",
                                    CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
     Comm::Write(srv, &mb, writeCall);
+    } else {
+        // we have to eat the connect response from the peer (so that the client
+        // does not see it) and only then start shoveling data to the client
+        AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectReqWriteDone",
+                                       CommIoCbPtrFun(tunnelConnectReqWriteDone,
+                                       tunnelState));
+        Comm::Write(srv, &mb, writeCall);
+        tunnelState->connectReqWriting = true;
+
+        tunnelState->connectRespBuf = new MemBuf;
+        // SQUID_TCP_SO_RCVBUF: we should not accumulate more than regular I/O buffer
+        // can hold since any CONNECT response leftovers have to fit into server.buf.
+        // 2*SQUID_TCP_SO_RCVBUF: HttpMsg::parse() zero-terminates, which uses space.
+        tunnelState->connectRespBuf->init(SQUID_TCP_SO_RCVBUF, 2*SQUID_TCP_SO_RCVBUF);
+        tunnelState->readConnectResponse();
+
+        assert(tunnelState->waitingForConnectExchange());
+    }
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
     commSetConnTimeout(srv, Config.Timeout.read, timeoutCall);
 }
 
 static void
 tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
 
     if (peer_paths == NULL || peer_paths->size() < 1) {
         debugs(26, 3, HERE << "No paths found. Aborting CONNECT");
         if (!err) {
             err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request);
         }
         *tunnelState->status_ptr = err->httpStatus;
         err->callback = tunnelErrorComplete;
         err->callback_data = tunnelState;
         errorSend(tunnelState->client.conn, err);

Reply via email to