This patch adds a Comm::Write API for accepting SBuf output buffers.

Unlike the MemBuf API it does not deallocate the backing store once written.

Unlike the char* API the buffer can be appended to before write(2) is
completed and the appended bytes will be handled as part of the original
write request.

All other behaviour is identical between the three Comm::Write APIs.


To get it in action and tested the main tunnel.cc I/O
"data pipe" buffers are converted to SBuf. Initial testing with a few
hundred CONNECT requests in real traffic shows no issues. We even get
rid of one potential data copy when passing the initial client read
buffer in from ConnStateData.
 NP: at this point no effort is made to take advantage of the
early-append possibility.
 The buffers behind the setup request/response handling are also left as
MemBuf due to HttpMsg::parse() and packerToMemInit() APIs missing SBuf
support.

Amos
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc   2014-02-21 10:46:19 +0000
+++ src/comm/Write.cc   2014-05-11 15:03:02 +0000
@@ -1,118 +1,145 @@
 #include "squid.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "MemBuf.h"
 #include "profiler/Profiler.h"
+#include "SBuf.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 
 #include <cerrno>
 
 void
+Comm::Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer 
&callback)
+{
+    debugs(5, 5, conn << ": sz " << sb.length() << ": asynCall " << callback);
+
+    /* Make sure we are open, not closing, and not writing */
+    assert(fd_table[conn->fd].flags.open);
+    assert(!fd_table[conn->fd].closing());
+    Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+    assert(!ccb->active());
+
+    fd_table[conn->fd].writeStart = squid_curtime;
+    ccb->conn = conn;
+    /* Queue the write */
+    ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb.length());
+    ccb->buf2 = &sb;
+    ccb->selectOrQueueWrite();
+}
+
+/// \deprecated use SBuf for I/O buffer instead
+void
 Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, 
AsyncCall::Pointer &callback)
 {
     Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
+/// \deprecated use SBuf for I/O buffer instead
 void
 Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, 
AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(fd_table[conn->fd].flags.open);
     assert(!fd_table[conn->fd].closing());
     Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
     assert(!ccb->active());
 
     fd_table[conn->fd].writeStart = squid_curtime;
     ccb->conn = conn;
     /* Queue the write */
     ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
     ccb->selectOrQueueWrite();
 }
 
 /** Write to FD.
  * This function is used by the lowest level of IO loop which only has access 
to FD numbers.
  * We have to use the comm iocb_table to map FD numbers to waiting data and 
Comm::Connections.
  * Once the write has been concluded we schedule the waiting call with 
success/fail results.
  */
 void
 Comm::HandleWrite(int fd, void *data)
 {
     Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
     int len = 0;
     int nleft;
 
     assert(state->conn != NULL && state->conn->fd == fd);
 
     PROF_start(commHandleWrite);
-    debugs(5, 5, HERE << state->conn << ": off " <<
-           (long int) state->offset << ", sz " << (long int) state->size << 
".");
 
-    nleft = state->size - state->offset;
+    debugs(5, 5, state->conn << ": off " << state->offset << ", sz " << 
state->size);
+    if (state->buf2)
+        nleft = state->buf2->length();
+    else
+        nleft = state->size - state->offset;
 
 #if USE_DELAY_POOLS
     ClientInfo * clientInfo=fd_table[fd].clientInfo;
 
     if (clientInfo && !clientInfo->writeLimitingActive)
         clientInfo = NULL; // we only care about quota limits here
 
     if (clientInfo) {
         assert(clientInfo->selectWaiting);
         clientInfo->selectWaiting = false;
 
         assert(clientInfo->hasQueue());
         assert(clientInfo->quotaPeekFd() == fd);
         clientInfo->quotaDequeue(); // we will write or requeue below
 
         if (nleft > 0) {
             const int quota = clientInfo->quotaForDequed();
             if (!quota) {  // if no write quota left, queue this fd
                 state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
                 clientInfo->kickQuotaQueue();
                 PROF_stop(commHandleWrite);
                 return;
             }
 
             const int nleft_corrected = min(nleft, quota);
             if (nleft != nleft_corrected) {
                 debugs(5, 5, HERE << state->conn << " writes only " <<
                        nleft_corrected << " out of " << nleft);
                 nleft = nleft_corrected;
             }
 
         }
     }
 #endif /* USE_DELAY_POOLS */
 
     /* actually WRITE data */
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
-    debugs(5, 5, HERE << "write() returns " << len);
+    if (state->buf2)
+        len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft);
+    else
+        len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    debugs(5, 5, "write() returns " << len);
 
 #if USE_DELAY_POOLS
     if (clientInfo) {
         if (len > 0) {
             /* we wrote data - drain them from bucket */
             clientInfo->bucketSize -= len;
             if (clientInfo->bucketSize < 0.0) {
                 debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // 
should not happen
                 clientInfo->bucketSize = 0;
             }
         }
 
         // even if we wrote nothing, we were served; give others a chance
         clientInfo->kickQuotaQueue();
     }
 #endif /* USE_DELAY_POOLS */
 
     fd_bytes(fd, len, FD_WRITE);
     ++statCounter.syscalls.sock.writes;
     // After each successful partial write,
@@ -124,31 +151,33 @@
         /* We're done */
         if (nleft != 0)
             debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: 
connection closed with " << nleft << " bytes remaining.");
 
         state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
     } else if (len < 0) {
         /* An error */
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
         } else if (ignoreErrno(errno)) {
             debugs(50, 9, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->selectOrQueueWrite();
         } else {
             debugs(50, 2, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
         }
     } else {
         /* A successful write, continue */
         state->offset += len;
+        if (state->buf2)
+            state->buf2->consume(len);
 
         if (state->offset < state->size) {
             /* Not done, reinstall the write handler and write some more */
             state->selectOrQueueWrite();
         } else {
             state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
         }
     }
 
     PROF_stop(commHandleWrite);
 }

=== modified file 'src/comm/Write.h'
--- src/comm/Write.h    2012-08-14 11:53:07 +0000
+++ src/comm/Write.h    2014-05-11 14:47:10 +0000
@@ -1,34 +1,44 @@
 #ifndef _SQUID_COMM_IOWRITE_H
 #define _SQUID_COMM_IOWRITE_H
 
 #include "base/AsyncCall.h"
 #include "comm/forward.h"
 #include "typedefs.h"
 
 class MemBuf;
+class SBuf;
+
 namespace Comm
 {
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  *
  * free_func is used to free the passed buffer when the write has completed.
  */
 void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, 
AsyncCall::Pointer &callback, FREE *free_func);
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  */
 void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer 
&callback);
 
+/**
+ * Queue a write for an SBuf contents. The SBuf content is consume()'d as it 
is written.
+ * callback is scheduled when the SBuf is emptied, on error, or on file 
descriptor close.
+ * If the SBuf is added to while write(2) are ongoing the additional bytes 
will also be
+ * written before the callback is scheduled.
+ */
+void Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer 
&callback);
+
 /// Cancel the write pending on FD. No action if none pending.
 void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
 
 // callback handler to process an FD which is available for writing.
 extern PF HandleWrite;
 
 } // namespace Comm
 
 #endif /* _SQUID_COMM_IOWRITE_H */

=== modified file 'src/tunnel.cc'
--- src/tunnel.cc       2014-05-07 14:40:05 +0000
+++ src/tunnel.cc       2014-05-11 16:56:00 +0000
@@ -69,108 +69,96 @@
  * then shuffling binary data between the resulting FD pair.
  */
 /*
  * TODO 1: implement a read/write API on ConnStateData to send/receive blocks
  * of pre-formatted data. Then we can use that as the client side of the tunnel
  * instead of re-implementing it here and occasionally getting the 
ConnStateData
  * read/write state wrong.
  *
  * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
  */
 class TunnelStateData
 {
 
 public:
     TunnelStateData();
     ~TunnelStateData();
     TunnelStateData(const TunnelStateData &); // do not implement
     TunnelStateData &operator =(const TunnelStateData &); // do not implement
 
     class Connection;
-    static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t 
len, comm_err_t errcode, int xerrno, void *data);
-    static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t 
len, comm_err_t errcode, int xerrno, void *data);
-    static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data);
-    static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data);
+    static void ReadClient(const Comm::ConnectionPointer &, char *, size_t 
len, comm_err_t errcode, int xerrno, void *data);
+    static void ReadServer(const Comm::ConnectionPointer &, char *, size_t 
len, comm_err_t errcode, int xerrno, void *data);
+    static void WriteClientDone(const Comm::ConnectionPointer &, char *, 
size_t len, comm_err_t flag, int xerrno, void *data);
+    static void WriteServerDone(const Comm::ConnectionPointer &, char *, 
size_t len, comm_err_t flag, int xerrno, void *data);
 
     /// Starts reading peer response to our CONNECT request.
     void readConnectResponse();
 
     /// Called when we may be done handling a CONNECT exchange with the peer.
     void connectExchangeCheckpoint();
 
     bool noConnections() const;
     char *url;
     CbcPointer<ClientHttpRequest> http;
     HttpRequest::Pointer request;
     AccessLogEntryPointer al;
     Comm::ConnectionList serverDestinations;
 
     const char * getHost() const {
         return (server.conn != NULL && server.conn->getPeer() ? 
server.conn->getPeer()->host : request->GetHost());
     };
 
     /// Whether we are writing a CONNECT request to a peer.
     bool waitingForConnectRequest() const { return connectReqWriting; }
     /// Whether we are reading a CONNECT response from a peer.
     bool waitingForConnectResponse() const { return connectRespBuf; }
     /// Whether we are waiting for the CONNECT request/response exchange with 
the peer.
     bool waitingForConnectExchange() const { return waitingForConnectRequest() 
|| waitingForConnectResponse(); }
 
     /// Whether the client sent a CONNECT request to us.
     bool clientExpectsConnectResponse() const {
         return !(request != NULL &&
                  (request->flags.interceptTproxy || 
request->flags.intercepted));
     }
 
     class Connection
     {
 
     public:
-        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), 
size_ptr(NULL) {}
-
-        ~Connection();
+        Connection() : size_ptr(NULL) {}
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
-        void bytesIn(int const &);
-#if USE_DELAY_POOLS
-
-        void setDelayId(DelayId const &);
-#endif
-
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
         /// handles a non-I/O error associated with this Connection
         void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
-        int len;
-        char *buf;
+        SBuf buf;
         int64_t *size_ptr;             /* pointer to size in an ConnStateData 
for logging */
 
         Comm::ConnectionPointer conn;    ///< The currently connected 
connection.
 
-    private:
 #if USE_DELAY_POOLS
-
         DelayId delayId;
 #endif
-
     };
 
     Connection client, server;
     int *status_ptr;           /* pointer to status for logging */
     MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we 
need it
     bool connectReqWriting; ///< whether we are writing a CONNECT request to a 
peer
 
     void copyRead(Connection &from, IOCB *completion);
 
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
 private:
 #if USE_OPENSSL
     /// Gives PeerConnector access to Answer in the TunnelStateData callback 
dialer.
     class MyAnswerDialer: public CallDialer, public 
Ssl::PeerConnector::CbDialer
     {
     public:
         typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &);
 
@@ -181,208 +169,196 @@
         virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); }
         void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); }
         virtual void print(std::ostream &os) const {
             os << '(' << tunnel_.get() << ", " << answer_ << ')';
         }
 
         /* Ssl::PeerConnector::CbDialer API */
         virtual Ssl::PeerConnectorAnswer &answer() { return answer_; }
 
     private:
         Method method_;
         CbcPointer<TunnelStateData> tunnel_;
         Ssl::PeerConnectorAnswer answer_;
     };
 
     void connectedToPeer(Ssl::PeerConnectorAnswer &answer);
 #endif
 
     CBDATA_CLASS2(TunnelStateData);
     bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, 
Connection &from, Connection &to);
-    void copy(size_t len, Connection &from, Connection &to, IOCB *);
+    void copy(Connection &from, Connection &to, IOCB *);
     void handleConnectResponse(const size_t chunkSize);
-    void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
-    void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+    void readServer(size_t len, comm_err_t errcode, int xerrno);
+    void readClient(size_t len, comm_err_t errcode, int xerrno);
+    void writeClientDone(size_t len, comm_err_t flag, int xerrno);
+    void writeServerDone(size_t len, comm_err_t flag, int xerrno);
 
     static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char 
*buf, size_t len, comm_err_t errcode, int xerrno, void *data);
     void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, 
int xerrno);
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection 
established\r\n\r\n";
 
 static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, 
void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
-    if (!tunnelState->server.len) {
+    if (tunnelState->server.buf.isEmpty()) {
         tunnelState->client.conn->close();
         return;
     }
 }
 
 static void
 tunnelClientClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->client.conn);
     tunnelState->client.conn = NULL;
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
-    if (!tunnelState->client.len) {
+    if (tunnelState->client.buf.isEmpty()) {
         tunnelState->server.conn->close();
         return;
     }
 }
 
 TunnelStateData::TunnelStateData() :
         url(NULL),
         http(),
         request(NULL),
         status_ptr(NULL),
         connectRespBuf(NULL),
         connectReqWriting(false)
 {
     debugs(26, 3, "TunnelStateData constructed this=" << this);
 }
 
 TunnelStateData::~TunnelStateData()
 {
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
     serverDestinations.clear();
     delete connectRespBuf;
 }
 
-TunnelStateData::Connection::~Connection()
-{
-    safe_free(buf);
-}
-
 int
 TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
 {
 #if USE_DELAY_POOLS
     return delayId.bytesWanted(lowerbound, upperbound);
 #else
 
     return upperbound;
 #endif
 }
 
-void
-TunnelStateData::Connection::bytesIn(int const &count)
-{
-    debugs(26, 3, HERE << "len=" << len << " + count=" << count);
-#if USE_DELAY_POOLS
-    delayId.bytesIn(count);
-#endif
-
-    len += count;
-}
-
 int
 TunnelStateData::Connection::debugLevelForError(int const xerrno) const
 {
 #ifdef ECONNRESET
 
     if (xerrno == ECONNRESET)
         return 2;
 
 #endif
 
     if (ignoreErrno(xerrno))
         return 3;
 
     return 1;
 }
 
 /* Read from server side and queue it for writing to the client */
 void
-TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, 
size_t len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *, size_t 
len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert(cbdataReferenceValid(tunnelState));
     debugs(26, 3, HERE << c);
 
-    tunnelState->readServer(buf, len, errcode, xerrno);
+    tunnelState->readServer(len, errcode, xerrno);
 }
 
 void
-TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int 
xerrno)
+TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << 
errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
-        server.bytesIn(len);
+#if USE_DELAY_POOLS
+        server.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
-        copy(len, server, client, WriteClientDone);
+        copy(server, client, WriteClientDone);
 }
 
 /// Called when we read [a part of] CONNECT response from the peer
 void
 TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t 
errcode, int xerrno)
 {
     debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << 
errcode);
     assert(waitingForConnectResponse());
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
         connectRespBuf->appended(len);
-        server.bytesIn(len);
+#if USE_DELAY_POOLS
+        server.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
         handleConnectResponse(len);
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char 
*buf, size_t len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
 }
 
 /// Parses [possibly incomplete] CONNECT response and reacts to it.
 /// If the tunnel is being closed or more response data is needed, returns 
false.
@@ -417,230 +393,226 @@
             server.logicError("huge CONNECT response from peer");
             return;
         }
 
         // keep reading
         readConnectResponse();
         return;
     }
 
     // CONNECT response was successfully parsed
     *status_ptr = rep.sline.status();
 
     // bail if we did not get an HTTP 200 (Connection Established) response
     if (rep.sline.status() != Http::scOkay) {
         server.logicError("unsupported CONNECT response status code");
         return;
     }
 
     if (rep.hdr_sz < connectRespBuf->contentSize()) {
         // preserve bytes that the server already sent after the CONNECT 
response
-        server.len = connectRespBuf->contentSize() - rep.hdr_sz;
-        memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
-    } else {
-        // reset; delay pools were using this field to throttle CONNECT 
response
-        server.len = 0;
+        server.buf.append(connectRespBuf->content()+rep.hdr_sz, 
connectRespBuf->contentSize() - rep.hdr_sz);
     }
 
     delete connectRespBuf;
     connectRespBuf = NULL;
     connectExchangeCheckpoint();
 }
 
 void
 TunnelStateData::Connection::logicError(const char *errMsg)
 {
     debugs(50, 3, conn << " closing on error: " << errMsg);
     conn->close();
 }
 
 void
 TunnelStateData::Connection::error(int const xerrno)
 {
     /* XXX fixme xstrerror and xerrno... */
     errno = xerrno;
 
     debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write 
failure: " << xstrerror());
 
     if (!ignoreErrno(xerrno))
         conn->close();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
-TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t 
len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *, size_t 
len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->readClient(buf, len, errcode, xerrno);
+    tunnelState->readClient(len, errcode, xerrno);
 }
 
 void
-TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int 
xerrno)
+TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << 
errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
-        client.bytesIn(len);
+#if USE_DELAY_POOLS
+        client.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, client, server))
-        copy(len, client, server, WriteServerDone);
+        copy(client, server, WriteServerDone);
 }
 
 /// Updates state after reading from client or server.
 /// Returns whether the caller should use the data just read.
 bool
 TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int 
xerrno, Connection &from, Connection &to)
 {
     debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << 
"}");
 
     /* I think this is to prevent free-while-in-a-callback behaviour
      * - RBC 20030229
      * from.conn->close() / to.conn->close() done here trigger close callbacks 
which may free TunnelStateData
      */
     const CbcPointer<TunnelStateData> safetyLock(this);
 
     /* Bump the source connection read timeout on any activity */
     if (Comm::IsConnOpen(from.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, 
this));
         commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall);
     }
 
     /* Bump the dest connection read timeout on any activity */
     /* see Bug 3659: tunnels can be weird, with very long one-way transfers */
     if (Comm::IsConnOpen(to.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, 
this));
         commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall);
     }
 
     if (errcode)
         from.error (xerrno);
     else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
         debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the 
tunnel.");
         from.conn->close();
 
         /* Only close the remote end if we've finished queueing data to it */
-        if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
+        if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn) ) {
             to.conn->close();
         }
     } else if (cbdataReferenceValid(this)) {
         return true;
     }
 
     return false;
 }
 
 void
-TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB 
*completion)
+TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion)
 {
     debugs(26, 3, HERE << "Schedule Write");
     AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
                                          CommIoCbPtrFun(completion, this));
-    Comm::Write(to.conn, from.buf, len, call, NULL);
+    Comm::Write(to.conn, from.buf, call);
 }
 
 /* Writes data from the client buffer to the server side */
 void
-TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *, 
size_t len, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->writeServerDone(buf, len, flag, xerrno);
+    tunnelState->writeServerDone(len, flag, xerrno);
 }
 
 void
-TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int 
xerrno)
+TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno)
 {
     debugs(26, 3, HERE  << server.conn << ", " << len << " bytes written, 
flag=" << flag);
 
     /* Error? */
     if (flag != COMM_OK) {
         if (flag != COMM_ERR_CLOSING) {
             debugs(26, 4, HERE << "calling TunnelStateData::server.error(" << 
xerrno <<")");
             server.error(xerrno); // may call comm_close
         }
         return;
     }
 
     /* EOF? */
     if (len == 0) {
         debugs(26, 4, HERE << "No read input. Closing server connection.");
         server.conn->close();
         return;
     }
 
     /* Valid data */
     kb_incr(&(statCounter.server.all.kbytes_out), len);
     kb_incr(&(statCounter.server.other.kbytes_out), len);
     client.dataSent(len);
 
     /* If the other end has closed, so should we */
     if (!Comm::IsConnOpen(client.conn)) {
         debugs(26, 4, HERE << "Client gone away. Shutting down server 
connection.");
         server.conn->close();
         return;
     }
 
     const CbcPointer<TunnelStateData> safetyLock(this);        /* ??? should 
be locked by the caller... */
 
     if (cbdataReferenceValid(this))
         copyRead(client, ReadClient);
 }
 
 /* Writes data from the server buffer to the client side */
 void
-TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *, 
size_t len, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->writeClientDone(buf, len, flag, xerrno);
+    tunnelState->writeClientDone(len, flag, xerrno);
 }
 
 void
 TunnelStateData::Connection::dataSent(size_t amount)
 {
-    debugs(26, 3, HERE << "len=" << len << " - amount=" << amount);
-    assert(amount == (size_t)len);
-    len =0;
-    /* increment total object size */
+    debugs(26, 3, amount << " bytes written");
 
+    /* increment total object size */
     if (size_ptr)
         *size_ptr += amount;
 }
 
 void
-TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int 
xerrno)
+TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", " << len << " bytes written, 
flag=" << flag);
 
     /* Error? */
     if (flag != COMM_OK) {
         if (flag != COMM_ERR_CLOSING) {
             debugs(26, 4, HERE << "Closing client connection due to comm 
flags.");
             client.error(xerrno); // may call comm_close
         }
         return;
     }
 
     /* EOF? */
     if (len == 0) {
         debugs(26, 4, HERE << "Closing client connection due to 0 byte read.");
         client.conn->close();
         return;
     }
 
     /* Valid data */
@@ -665,86 +637,86 @@
 {
     TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data);
     debugs(26, 3, HERE << io.conn);
     /* Temporary lock to protect our own feets (comm_close -> 
tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     tunnelState->client.closeIfOpen();
     tunnelState->server.closeIfOpen();
 }
 
 void
 TunnelStateData::Connection::closeIfOpen()
 {
     if (Comm::IsConnOpen(conn))
         conn->close();
 }
 
 void
 TunnelStateData::copyRead(Connection &from, IOCB *completion)
 {
-    assert(from.len == 0);
     AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
                                          CommIoCbPtrFun(completion, this));
-    comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), 
call);
+    from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF));
+    comm_read(from.conn, from.buf, call);
 }
 
 void
 TunnelStateData::readConnectResponse()
 {
     assert(waitingForConnectResponse());
 
     AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
                                          
CommIoCbPtrFun(ReadConnectResponseDone, this));
     comm_read(server.conn, connectRespBuf->space(),
               server.bytesWanted(1, connectRespBuf->spaceSize()), call);
 }
 
 /**
  * Set the HTTP status for this request and sets the read handlers for client
  * and server side connections.
  */
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
     assert(!tunnelState->waitingForConnectExchange());
     *tunnelState->status_ptr = Http::scOkay;
     if (cbdataReferenceValid(tunnelState)) {
 
         // Shovel any payload already pushed into reply buffer by the server 
response
-        if (!tunnelState->server.len)
+        if (!tunnelState->server.buf.length())
             tunnelState->copyRead(tunnelState->server, 
TunnelStateData::ReadServer);
         else {
-            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("", 
tunnelState->server.buf, tunnelState->server.len) << "\n----------");
-            tunnelState->copy(tunnelState->server.len, tunnelState->server, 
tunnelState->client, TunnelStateData::WriteClientDone);
+            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << 
tunnelState->server.buf << "\n----------");
+            tunnelState->copy(tunnelState->server, tunnelState->client, 
TunnelStateData::WriteClientDone);
         }
 
         // Bug 3371: shovel any payload already pushed into ConnStateData by 
the client request
         if (tunnelState->http.valid() && tunnelState->http->getConn() && 
!tunnelState->http->getConn()->in.buf.isEmpty()) {
             struct ConnStateData::In *in = &tunnelState->http->getConn()->in;
             debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf 
<< "\n----------");
 
             // We just need to ensure the bytes from ConnStateData are in 
client.buf already to deliver
-            memcpy(tunnelState->client.buf, in->buf.rawContent(), 
in->buf.length());
-            // NP: readClient() takes care of buffer length accounting.
-            tunnelState->readClient(tunnelState->client.buf, in->buf.length(), 
COMM_OK, 0);
+            tunnelState->client.buf = in->buf;
             in->buf.consume(); // ConnStateData buffer accounting after the 
shuffle.
+            // NP: readClient() takes care of buffer length accounting.
+            tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK, 
0);
         } else
             tunnelState->copyRead(tunnelState->client, 
TunnelStateData::ReadClient);
     }
 }
 
 /**
  * All the pieces we need to write to client and/or server connection
  * have been written.
  * Call the tunnelStartShoveling to start the blind pump.
  */
 static void
 tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, 
size_t size, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << conn << ", flag=" << flag);
 
     if (flag != COMM_OK) {
         *tunnelState->status_ptr = Http::scInternalServerError;
         tunnelErrorComplete(conn->fd, data, 0);
         return;
@@ -838,41 +810,41 @@
             Comm::ConnOpener *cs = new 
Comm::ConnOpener(tunnelState->serverDestinations[0], call, 
Config.Timeout.connect);
             cs->setHost(tunnelState->url);
             AsyncJob::Start(cs);
         } else {
             debugs(26, 4, HERE << "terminate with error.");
             ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, 
Http::scServiceUnavailable, tunnelState->request.getRaw());
             *tunnelState->status_ptr = Http::scServiceUnavailable;
             err->xerrno = xerrno;
             // on timeout is this still:    err->xerrno = ETIMEDOUT;
             err->port = conn->remote.port();
             err->callback = tunnelErrorComplete;
             err->callback_data = tunnelState;
             errorSend(tunnelState->client.conn, err);
         }
         return;
     }
 
 #if USE_DELAY_POOLS
     /* no point using the delayIsNoDelay stuff since tunnel is nice and simple 
*/
     if (conn->getPeer() && conn->getPeer()->options.no_delay)
-        tunnelState->server.setDelayId(DelayId());
+        tunnelState->server.delayId = DelayId();
 #endif
 
     tunnelState->request->hier.note(conn, tunnelState->getHost());
 
     tunnelState->server.conn = conn;
     tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host 
: NULL;
     comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState);
 
     debugs(26, 4, HERE << "determine post-connect handling pathway.");
     if (conn->getPeer()) {
         tunnelState->request->peer_login = conn->getPeer()->login;
         tunnelState->request->flags.proxying = 
!(conn->getPeer()->options.originserver);
     } else {
         tunnelState->request->peer_login = NULL;
         tunnelState->request->flags.proxying = false;
     }
 
     if (tunnelState->request->flags.proxying)
         tunnelState->connectToPeer();
     else {
@@ -906,41 +878,41 @@
          * default is to allow.
          */
         ACLFilledChecklist ch(Config.accessList.miss, request, NULL);
         ch.src_addr = request->client_addr;
         ch.my_addr = request->my_addr;
         if (ch.fastCheck() == ACCESS_DENIED) {
             debugs(26, 4, HERE << "MISS access forbidden.");
             err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, 
request);
             *status_ptr = Http::scForbidden;
             errorSend(http->getConn()->clientConnection, err);
             return;
         }
     }
 
     debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver);
     ++statCounter.server.all.requests;
     ++statCounter.server.other.requests;
 
     tunnelState = new TunnelStateData;
 #if USE_DELAY_POOLS
-    tunnelState->server.setDelayId(DelayId::DelayClient(http));
+    tunnelState->server.delayId = DelayId::DelayClient(http);
 #endif
     tunnelState->url = xstrdup(url);
     tunnelState->request = request;
     tunnelState->server.size_ptr = size_ptr;
     tunnelState->status_ptr = status_ptr;
     tunnelState->client.conn = http->getConn()->clientConnection;
     tunnelState->http = http;
     tunnelState->al = al;
 
     comm_add_close_handler(tunnelState->client.conn->fd,
                            tunnelClientClosed,
                            tunnelState);
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, 
tunnelState));
     commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, 
timeoutCall);
 
     peerSelect(&(tunnelState->serverDestinations), request, al,
                NULL,
                tunnelPeerSelectComplete,
@@ -1063,29 +1035,20 @@
     delete err;
 
     GetMarkingsToServer(tunnelState->request.getRaw(), 
*tunnelState->serverDestinations[0]);
 
     debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << 
(*peer_paths)[0] << "}, serverDest[0]={" <<
            tunnelState->serverDestinations[0] << "}");
 
     AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", 
CommConnectCbPtrFun(tunnelConnectDone, tunnelState));
     Comm::ConnOpener *cs = new 
Comm::ConnOpener(tunnelState->serverDestinations[0], call, 
Config.Timeout.connect);
     cs->setHost(tunnelState->url);
     AsyncJob::Start(cs);
 }
 
 CBDATA_CLASS_INIT(TunnelStateData);
 
 bool
 TunnelStateData::noConnections() const
 {
     return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn);
 }
-
-#if USE_DELAY_POOLS
-void
-TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
-{
-    delayId = newDelay;
-}
-
-#endif

Reply via email to