This patch adds a Comm::Write API for accepting SBuf output buffers. Unlike the MemBuf API it does not deallocate the backing store once written.
Unlike the char* API the buffer can be appended to before write(2) is completed and the appended bytes will be handled as part of the original write request. All other behaviour is identical between the three Comm::Write APIs. To get it in action and tested the main tunnel.cc I/O "data pipe" buffers are converted to SBuf. Initial testing with a few hundred CONNECT requests in real traffic shows no issues. We even get rid of one potential data copy when passing the initial client read buffer in from ConnStateData. NP: at this point no effort is made to take advantage of the early-append possibility. The buffers behind the setup request/response handling are also left as MemBuf due to HttpMsg::parse() and packerToMemInit() APIs missing SBuf support. Amos
=== modified file 'src/comm/Write.cc' --- src/comm/Write.cc 2014-02-21 10:46:19 +0000 +++ src/comm/Write.cc 2014-05-11 15:03:02 +0000 @@ -1,118 +1,145 @@ #include "squid.h" #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "MemBuf.h" #include "profiler/Profiler.h" +#include "SBuf.h" #include "SquidTime.h" #include "StatCounters.h" #if USE_DELAY_POOLS #include "ClientInfo.h" #endif #include <cerrno> void +Comm::Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer &callback) +{ + debugs(5, 5, conn << ": sz " << sb.length() << ": asynCall " << callback); + + /* Make sure we are open, not closing, and not writing */ + assert(fd_table[conn->fd].flags.open); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); + assert(!ccb->active()); + + fd_table[conn->fd].writeStart = squid_curtime; + ccb->conn = conn; + /* Queue the write */ + ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb.length()); + ccb->buf2 = &sb; + ccb->selectOrQueueWrite(); +} + +/// \deprecated use SBuf for I/O buffer instead +void Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) { Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc()); } +/// \deprecated use SBuf for I/O buffer instead void Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) { debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback); /* Make sure we are open, not closing, and not writing */ assert(fd_table[conn->fd].flags.open); assert(!fd_table[conn->fd].closing()); Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); assert(!ccb->active()); fd_table[conn->fd].writeStart = squid_curtime; ccb->conn = conn; /* Queue the write */ ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); ccb->selectOrQueueWrite(); } /** Write to FD. * This function is used by the lowest level of IO loop which only has access to FD numbers. * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections. * Once the write has been concluded we schedule the waiting call with success/fail results. */ void Comm::HandleWrite(int fd, void *data) { Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data); int len = 0; int nleft; assert(state->conn != NULL && state->conn->fd == fd); PROF_start(commHandleWrite); - debugs(5, 5, HERE << state->conn << ": off " << - (long int) state->offset << ", sz " << (long int) state->size << "."); - nleft = state->size - state->offset; + debugs(5, 5, state->conn << ": off " << state->offset << ", sz " << state->size); + if (state->buf2) + nleft = state->buf2->length(); + else + nleft = state->size - state->offset; #if USE_DELAY_POOLS ClientInfo * clientInfo=fd_table[fd].clientInfo; if (clientInfo && !clientInfo->writeLimitingActive) clientInfo = NULL; // we only care about quota limits here if (clientInfo) { assert(clientInfo->selectWaiting); clientInfo->selectWaiting = false; assert(clientInfo->hasQueue()); assert(clientInfo->quotaPeekFd() == fd); clientInfo->quotaDequeue(); // we will write or requeue below if (nleft > 0) { const int quota = clientInfo->quotaForDequed(); if (!quota) { // if no write quota left, queue this fd state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); clientInfo->kickQuotaQueue(); PROF_stop(commHandleWrite); return; } const int nleft_corrected = min(nleft, quota); if (nleft != nleft_corrected) { debugs(5, 5, HERE << state->conn << " writes only " << nleft_corrected << " out of " << nleft); nleft = nleft_corrected; } } } #endif /* USE_DELAY_POOLS */ /* actually WRITE data */ - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); - debugs(5, 5, HERE << "write() returns " << len); + if (state->buf2) + len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft); + else + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + debugs(5, 5, "write() returns " << len); #if USE_DELAY_POOLS if (clientInfo) { if (len > 0) { /* we wrote data - drain them from bucket */ clientInfo->bucketSize -= len; if (clientInfo->bucketSize < 0.0) { debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen clientInfo->bucketSize = 0; } } // even if we wrote nothing, we were served; give others a chance clientInfo->kickQuotaQueue(); } #endif /* USE_DELAY_POOLS */ fd_bytes(fd, len, FD_WRITE); ++statCounter.syscalls.sock.writes; // After each successful partial write, @@ -124,31 +151,33 @@ /* We're done */ if (nleft != 0) debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (len < 0) { /* An error */ if (fd_table[fd].flags.socket_eof) { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (ignoreErrno(errno)) { debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->selectOrQueueWrite(); } else { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } } else { /* A successful write, continue */ state->offset += len; + if (state->buf2) + state->buf2->consume(len); if (state->offset < state->size) { /* Not done, reinstall the write handler and write some more */ state->selectOrQueueWrite(); } else { state->finish(nleft ? COMM_OK : COMM_ERROR, errno); } } PROF_stop(commHandleWrite); } === modified file 'src/comm/Write.h' --- src/comm/Write.h 2012-08-14 11:53:07 +0000 +++ src/comm/Write.h 2014-05-11 14:47:10 +0000 @@ -1,34 +1,44 @@ #ifndef _SQUID_COMM_IOWRITE_H #define _SQUID_COMM_IOWRITE_H #include "base/AsyncCall.h" #include "comm/forward.h" #include "typedefs.h" class MemBuf; +class SBuf; + namespace Comm { /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. * * free_func is used to free the passed buffer when the write has completed. */ void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. */ void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback); +/** + * Queue a write for an SBuf contents. The SBuf content is consume()'d as it is written. + * callback is scheduled when the SBuf is emptied, on error, or on file descriptor close. + * If the SBuf is added to while write(2) are ongoing the additional bytes will also be + * written before the callback is scheduled. + */ +void Write(const Comm::ConnectionPointer &conn, SBuf &sb, AsyncCall::Pointer &callback); + /// Cancel the write pending on FD. No action if none pending. void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason); // callback handler to process an FD which is available for writing. extern PF HandleWrite; } // namespace Comm #endif /* _SQUID_COMM_IOWRITE_H */ === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-05-07 14:40:05 +0000 +++ src/tunnel.cc 2014-05-11 16:56:00 +0000 @@ -69,108 +69,96 @@ * then shuffling binary data between the resulting FD pair. */ /* * TODO 1: implement a read/write API on ConnStateData to send/receive blocks * of pre-formatted data. Then we can use that as the client side of the tunnel * instead of re-implementing it here and occasionally getting the ConnStateData * read/write state wrong. * * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server' */ class TunnelStateData { public: TunnelStateData(); ~TunnelStateData(); TunnelStateData(const TunnelStateData &); // do not implement TunnelStateData &operator =(const TunnelStateData &); // do not implement class Connection; - static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); - static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); - static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); - static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); + static void ReadClient(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t errcode, int xerrno, void *data); + static void ReadServer(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t errcode, int xerrno, void *data); + static void WriteClientDone(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t flag, int xerrno, void *data); + static void WriteServerDone(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t flag, int xerrno, void *data); /// Starts reading peer response to our CONNECT request. void readConnectResponse(); /// Called when we may be done handling a CONNECT exchange with the peer. void connectExchangeCheckpoint(); bool noConnections() const; char *url; CbcPointer<ClientHttpRequest> http; HttpRequest::Pointer request; AccessLogEntryPointer al; Comm::ConnectionList serverDestinations; const char * getHost() const { return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost()); }; /// Whether we are writing a CONNECT request to a peer. bool waitingForConnectRequest() const { return connectReqWriting; } /// Whether we are reading a CONNECT response from a peer. bool waitingForConnectResponse() const { return connectRespBuf; } /// Whether we are waiting for the CONNECT request/response exchange with the peer. bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); } /// Whether the client sent a CONNECT request to us. bool clientExpectsConnectResponse() const { return !(request != NULL && (request->flags.interceptTproxy || request->flags.intercepted)); } class Connection { public: - Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {} - - ~Connection(); + Connection() : size_ptr(NULL) {} int bytesWanted(int lower=0, int upper = INT_MAX) const; - void bytesIn(int const &); -#if USE_DELAY_POOLS - - void setDelayId(DelayId const &); -#endif - void error(int const xerrno); int debugLevelForError(int const xerrno) const; /// handles a non-I/O error associated with this Connection void logicError(const char *errMsg); void closeIfOpen(); void dataSent (size_t amount); - int len; - char *buf; + SBuf buf; int64_t *size_ptr; /* pointer to size in an ConnStateData for logging */ Comm::ConnectionPointer conn; ///< The currently connected connection. - private: #if USE_DELAY_POOLS - DelayId delayId; #endif - }; Connection client, server; int *status_ptr; /* pointer to status for logging */ MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer void copyRead(Connection &from, IOCB *completion); /// continue to set up connection to a peer, going async for SSL peers void connectToPeer(); private: #if USE_OPENSSL /// Gives PeerConnector access to Answer in the TunnelStateData callback dialer. class MyAnswerDialer: public CallDialer, public Ssl::PeerConnector::CbDialer { public: typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &); @@ -181,208 +169,196 @@ virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); } void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); } virtual void print(std::ostream &os) const { os << '(' << tunnel_.get() << ", " << answer_ << ')'; } /* Ssl::PeerConnector::CbDialer API */ virtual Ssl::PeerConnectorAnswer &answer() { return answer_; } private: Method method_; CbcPointer<TunnelStateData> tunnel_; Ssl::PeerConnectorAnswer answer_; }; void connectedToPeer(Ssl::PeerConnectorAnswer &answer); #endif CBDATA_CLASS2(TunnelStateData); bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to); - void copy(size_t len, Connection &from, Connection &to, IOCB *); + void copy(Connection &from, Connection &to, IOCB *); void handleConnectResponse(const size_t chunkSize); - void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); - void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); - void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); - void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + void readServer(size_t len, comm_err_t errcode, int xerrno); + void readClient(size_t len, comm_err_t errcode, int xerrno); + void writeClientDone(size_t len, comm_err_t flag, int xerrno); + void writeServerDone(size_t len, comm_err_t flag, int xerrno); static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno); }; static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; static CLCB tunnelServerClosed; static CLCB tunnelClientClosed; static CTCB tunnelTimeout; static PSC tunnelPeerSelectComplete; static void tunnelConnected(const Comm::ConnectionPointer &server, void *); static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *); static void tunnelServerClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->server.conn); tunnelState->server.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->server.len) { + if (tunnelState->server.buf.isEmpty()) { tunnelState->client.conn->close(); return; } } static void tunnelClientClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->client.conn); tunnelState->client.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->client.len) { + if (tunnelState->client.buf.isEmpty()) { tunnelState->server.conn->close(); return; } } TunnelStateData::TunnelStateData() : url(NULL), http(), request(NULL), status_ptr(NULL), connectRespBuf(NULL), connectReqWriting(false) { debugs(26, 3, "TunnelStateData constructed this=" << this); } TunnelStateData::~TunnelStateData() { debugs(26, 3, "TunnelStateData destructed this=" << this); assert(noConnections()); xfree(url); serverDestinations.clear(); delete connectRespBuf; } -TunnelStateData::Connection::~Connection() -{ - safe_free(buf); -} - int TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const { #if USE_DELAY_POOLS return delayId.bytesWanted(lowerbound, upperbound); #else return upperbound; #endif } -void -TunnelStateData::Connection::bytesIn(int const &count) -{ - debugs(26, 3, HERE << "len=" << len << " + count=" << count); -#if USE_DELAY_POOLS - delayId.bytesIn(count); -#endif - - len += count; -} - int TunnelStateData::Connection::debugLevelForError(int const xerrno) const { #ifdef ECONNRESET if (xerrno == ECONNRESET) return 2; #endif if (ignoreErrno(xerrno)) return 3; return 1; } /* Read from server side and queue it for writing to the client */ void -TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) +TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert(cbdataReferenceValid(tunnelState)); debugs(26, 3, HERE << c); - tunnelState->readServer(buf, len, errcode, xerrno); + tunnelState->readServer(len, errcode, xerrno); } void -TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) - copy(len, server, client, WriteClientDone); + copy(server, client, WriteClientDone); } /// Called when we read [a part of] CONNECT response from the peer void TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode); assert(waitingForConnectResponse()); if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { connectRespBuf->appended(len); - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) handleConnectResponse(len); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); tunnelState->readConnectResponseDone(buf, len, errcode, xerrno); } /// Parses [possibly incomplete] CONNECT response and reacts to it. /// If the tunnel is being closed or more response data is needed, returns false. @@ -417,230 +393,226 @@ server.logicError("huge CONNECT response from peer"); return; } // keep reading readConnectResponse(); return; } // CONNECT response was successfully parsed *status_ptr = rep.sline.status(); // bail if we did not get an HTTP 200 (Connection Established) response if (rep.sline.status() != Http::scOkay) { server.logicError("unsupported CONNECT response status code"); return; } if (rep.hdr_sz < connectRespBuf->contentSize()) { // preserve bytes that the server already sent after the CONNECT response - server.len = connectRespBuf->contentSize() - rep.hdr_sz; - memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len); - } else { - // reset; delay pools were using this field to throttle CONNECT response - server.len = 0; + server.buf.append(connectRespBuf->content()+rep.hdr_sz, connectRespBuf->contentSize() - rep.hdr_sz); } delete connectRespBuf; connectRespBuf = NULL; connectExchangeCheckpoint(); } void TunnelStateData::Connection::logicError(const char *errMsg) { debugs(50, 3, conn << " closing on error: " << errMsg); conn->close(); } void TunnelStateData::Connection::error(int const xerrno) { /* XXX fixme xstrerror and xerrno... */ errno = xerrno; debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror()); if (!ignoreErrno(xerrno)) conn->close(); } /* Read from client side and queue it for writing to the server */ void -TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) +TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->readClient(buf, len, errcode, xerrno); + tunnelState->readClient(len, errcode, xerrno); } void -TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - client.bytesIn(len); +#if USE_DELAY_POOLS + client.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.client_http.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, client, server)) - copy(len, client, server, WriteServerDone); + copy(client, server, WriteServerDone); } /// Updates state after reading from client or server. /// Returns whether the caller should use the data just read. bool TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to) { debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}"); /* I think this is to prevent free-while-in-a-callback behaviour * - RBC 20030229 * from.conn->close() / to.conn->close() done here trigger close callbacks which may free TunnelStateData */ const CbcPointer<TunnelStateData> safetyLock(this); /* Bump the source connection read timeout on any activity */ if (Comm::IsConnOpen(from.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall); } /* Bump the dest connection read timeout on any activity */ /* see Bug 3659: tunnels can be weird, with very long one-way transfers */ if (Comm::IsConnOpen(to.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall); } if (errcode) from.error (xerrno); else if (len == 0 || !Comm::IsConnOpen(to.conn)) { debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the tunnel."); from.conn->close(); /* Only close the remote end if we've finished queueing data to it */ - if (from.len == 0 && Comm::IsConnOpen(to.conn) ) { + if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn) ) { to.conn->close(); } } else if (cbdataReferenceValid(this)) { return true; } return false; } void -TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion) +TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion) { debugs(26, 3, HERE << "Schedule Write"); AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler", CommIoCbPtrFun(completion, this)); - Comm::Write(to.conn, from.buf, len, call, NULL); + Comm::Write(to.conn, from.buf, call); } /* Writes data from the client buffer to the server side */ void -TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeServerDone(buf, len, flag, xerrno); + tunnelState->writeServerDone(len, flag, xerrno); } void -TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << server.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "calling TunnelStateData::server.error(" << xerrno <<")"); server.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "No read input. Closing server connection."); server.conn->close(); return; } /* Valid data */ kb_incr(&(statCounter.server.all.kbytes_out), len); kb_incr(&(statCounter.server.other.kbytes_out), len); client.dataSent(len); /* If the other end has closed, so should we */ if (!Comm::IsConnOpen(client.conn)) { debugs(26, 4, HERE << "Client gone away. Shutting down server connection."); server.conn->close(); return; } const CbcPointer<TunnelStateData> safetyLock(this); /* ??? should be locked by the caller... */ if (cbdataReferenceValid(this)) copyRead(client, ReadClient); } /* Writes data from the server buffer to the client side */ void -TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeClientDone(buf, len, flag, xerrno); + tunnelState->writeClientDone(len, flag, xerrno); } void TunnelStateData::Connection::dataSent(size_t amount) { - debugs(26, 3, HERE << "len=" << len << " - amount=" << amount); - assert(amount == (size_t)len); - len =0; - /* increment total object size */ + debugs(26, 3, amount << " bytes written"); + /* increment total object size */ if (size_ptr) *size_ptr += amount; } void -TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << client.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "Closing client connection due to comm flags."); client.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "Closing client connection due to 0 byte read."); client.conn->close(); return; } /* Valid data */ @@ -665,86 +637,86 @@ { TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data); debugs(26, 3, HERE << io.conn); /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */ CbcPointer<TunnelStateData> safetyLock(tunnelState); tunnelState->client.closeIfOpen(); tunnelState->server.closeIfOpen(); } void TunnelStateData::Connection::closeIfOpen() { if (Comm::IsConnOpen(conn)) conn->close(); } void TunnelStateData::copyRead(Connection &from, IOCB *completion) { - assert(from.len == 0); AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler", CommIoCbPtrFun(completion, this)); - comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); + from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF)); + comm_read(from.conn, from.buf, call); } void TunnelStateData::readConnectResponse() { assert(waitingForConnectResponse()); AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone", CommIoCbPtrFun(ReadConnectResponseDone, this)); comm_read(server.conn, connectRespBuf->space(), server.bytesWanted(1, connectRespBuf->spaceSize()), call); } /** * Set the HTTP status for this request and sets the read handlers for client * and server side connections. */ static void tunnelStartShoveling(TunnelStateData *tunnelState) { assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (cbdataReferenceValid(tunnelState)) { // Shovel any payload already pushed into reply buffer by the server response - if (!tunnelState->server.len) + if (!tunnelState->server.buf.length()) tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); else { - debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("", tunnelState->server.buf, tunnelState->server.len) << "\n----------"); - tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); + debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << tunnelState->server.buf << "\n----------"); + tunnelState->copy(tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); } // Bug 3371: shovel any payload already pushed into ConnStateData by the client request if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->in.buf.isEmpty()) { struct ConnStateData::In *in = &tunnelState->http->getConn()->in; debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf << "\n----------"); // We just need to ensure the bytes from ConnStateData are in client.buf already to deliver - memcpy(tunnelState->client.buf, in->buf.rawContent(), in->buf.length()); - // NP: readClient() takes care of buffer length accounting. - tunnelState->readClient(tunnelState->client.buf, in->buf.length(), COMM_OK, 0); + tunnelState->client.buf = in->buf; in->buf.consume(); // ConnStateData buffer accounting after the shuffle. + // NP: readClient() takes care of buffer length accounting. + tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK, 0); } else tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); } } /** * All the pieces we need to write to client and/or server connection * have been written. * Call the tunnelStartShoveling to start the blind pump. */ static void tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << conn << ", flag=" << flag); if (flag != COMM_OK) { *tunnelState->status_ptr = Http::scInternalServerError; tunnelErrorComplete(conn->fd, data, 0); return; @@ -838,41 +810,41 @@ Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } else { debugs(26, 4, HERE << "terminate with error."); ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw()); *tunnelState->status_ptr = Http::scServiceUnavailable; err->xerrno = xerrno; // on timeout is this still: err->xerrno = ETIMEDOUT; err->port = conn->remote.port(); err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err); } return; } #if USE_DELAY_POOLS /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */ if (conn->getPeer() && conn->getPeer()->options.no_delay) - tunnelState->server.setDelayId(DelayId()); + tunnelState->server.delayId = DelayId(); #endif tunnelState->request->hier.note(conn, tunnelState->getHost()); tunnelState->server.conn = conn; tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host : NULL; comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState); debugs(26, 4, HERE << "determine post-connect handling pathway."); if (conn->getPeer()) { tunnelState->request->peer_login = conn->getPeer()->login; tunnelState->request->flags.proxying = !(conn->getPeer()->options.originserver); } else { tunnelState->request->peer_login = NULL; tunnelState->request->flags.proxying = false; } if (tunnelState->request->flags.proxying) tunnelState->connectToPeer(); else { @@ -906,41 +878,41 @@ * default is to allow. */ ACLFilledChecklist ch(Config.accessList.miss, request, NULL); ch.src_addr = request->client_addr; ch.my_addr = request->my_addr; if (ch.fastCheck() == ACCESS_DENIED) { debugs(26, 4, HERE << "MISS access forbidden."); err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, request); *status_ptr = Http::scForbidden; errorSend(http->getConn()->clientConnection, err); return; } } debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver); ++statCounter.server.all.requests; ++statCounter.server.other.requests; tunnelState = new TunnelStateData; #if USE_DELAY_POOLS - tunnelState->server.setDelayId(DelayId::DelayClient(http)); + tunnelState->server.delayId = DelayId::DelayClient(http); #endif tunnelState->url = xstrdup(url); tunnelState->request = request; tunnelState->server.size_ptr = size_ptr; tunnelState->status_ptr = status_ptr; tunnelState->client.conn = http->getConn()->clientConnection; tunnelState->http = http; tunnelState->al = al; comm_add_close_handler(tunnelState->client.conn->fd, tunnelClientClosed, tunnelState); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall); peerSelect(&(tunnelState->serverDestinations), request, al, NULL, tunnelPeerSelectComplete, @@ -1063,29 +1035,20 @@ delete err; GetMarkingsToServer(tunnelState->request.getRaw(), *tunnelState->serverDestinations[0]); debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" << tunnelState->serverDestinations[0] << "}"); AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } CBDATA_CLASS_INIT(TunnelStateData); bool TunnelStateData::noConnections() const { return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn); } - -#if USE_DELAY_POOLS -void -TunnelStateData::Connection::setDelayId(DelayId const &newDelay) -{ - delayId = newDelay; -} - -#endif