On 13/05/2014 10:46 a.m., Alex Rousskov wrote: > On 05/11/2014 11:16 AM, Amos Jeffries wrote: > >> This patch adds a Comm::Write API for accepting SBuf output buffers. > > >> Unlike the char* API the buffer can be appended to before write(2) is >> completed and the appended bytes will be handled as part of the original >> write request. > > That approach scares me because the caller has to be extra careful not > to leave the being-updated-area of the buffer in inconsistent state > across async calls. However, I doubt we have such complex callers today, > and the approach also reduces the number of usually pointless write > callback calls, so I am willing to experiment.
If you have a better approach I would like to hear it. The only one I can think of is possibly Ref-Counting the SBuf instance itself. Adding that capability though seems to be implying that SBuf::Pointer in general is better than just passing SBuf references around (its not). The raw-pointer usage which appears to be scaring you is the existing status quo anyway... * char* API passes a raw pointer to the store/buffer. Any alterations of the content are racing the actual write(2) call for those bytes, appends are lost on callback, and truncate are not seen by the write(2). Corrupted output one way or another. => requirement that the caller do nothing other than allocate a new buffer while write is scheduled. Buffer may be re-used after the callback. * MemBuf API passes a raw pointer to its _internal_ data store/buffer. It goes further by passing the store deallocator. Any alterations of the content are racing the actual write(2) call for those bytes, appends are either lost on callback or reallocate the backing store for the caller, and truncate are not seen by the write(2). All operations are potentially done on a buffer pointer already deallocated by comm. => requirement that the caller do nothing other than MemBuf::init() while write is scheduled (ie allocate a new buffer). Same condition applies even after callback (ouch). * SBuf API passes a reference/raw-pointer to the smart pointer SBuf instance. All operations on the MemBlob backing store remain valid. => requirement that the caller do nothing to the SBuf instance other than allocate a new one while write is scheduled. ==> Note this is the same requirement as made on both other API already. But no longer restricted on what can be done with the backing store by the caller. ie it can continue to use the SBuf it has for regular SBuf operations. Since these SBuf are members of a caller Job (or equivalent state object) instead of a dynamically allocated/deallocated SBuf object that seems a reasonable requirement. > >> All other behaviour is identical between the three Comm::Write APIs. > > You forgot to mention the fact that the new Write API, unlike the > existing two, relies on the caller to keep the buffer area alive. That > requirement, IMO, is a show-stopper. I strongly object to that change. If by "buffer area" you mean MemBlob that is not a requirement. It can have anything at all done to it during Comm::Write. If it is cleared the write will probably return 0 bytes written to the callback. Most callbacks assume that means the connection has terminated unexpectedly. So its not advisable, but possible without any internal problems to comm. If by "buffer area" you means SBuf. See above comparison with the other two APIs. I think that is reasonable to retain that requirement. Especially since it only applies on the small SBuf instance. One either sends an SBuf which is a member of the Job/*StateData object itself or an SBuf member of the Params object which is delivered to the callback. Both methods ensure its alive until at least the callback is scheduled. If/when we start delivering lists of small SBuf objects that is a whole other API needed for SBufList. OR perhapse a Job loop writing each SBuf in the list individually ... with the Job doing this keeping the relevant SBuf instance alive. > > The buf2 hack has already been added during unaudited "Convert the > ConnStateData input buffer to SBuf" commit (r13324), so there is little > I can do about it until it causes a bug, but I do object to the use of > such raw/unprotected links between Comm and its callers in any code I am > lucky to review before the commit. > > If others overrule this objection, please at least change the > Comm::Write() sb argument type from reference to a pointer. Passing an > argument by reference (implying temporary use) and then taking an > address of that argument for long term object storage is wrong. Fine. > >> /// Buffer to store read(2) into when set. >> // This is a pointer to the Jobs buffer rather than an SBuf using >> // the same store since we cannot know when or how the Job will >> // alter its SBuf while we are reading. >> SBuf *buf2; >> >> // Legacy c-string buffers used when buf2 is unset. >> char *buf; > > > Another side-effect is that "I wrote this, what's next?" notifications > may now be delayed for a very long time (potentially forever) if the > caller keeps adding content to the buffer. I suspect that most writers > do not really need such notifications. Thank you for documenting this > aspect (although it deserves to be mentioned in the commit message as > well IMO). There is no code doing this behaviour at present. I trust that we have enough quality on new code now to ensure that its only taken advantage of when appropriate. For now the (offset > size) check for re-scheduling is still in effect. It is just that up to TCP buffer amount more bytes can be written if they are appended early. The callback will be delivered the accurate written amount and teh buffer consumed() correctly either way. I've changed the "will" to "may" to avoid implying that the indefinite wait/hang situation. In this round I have also added a check to ensure that caller using consume() does not cause hanging. But there are worse logic problems in callers doing that inappropriately. > >> - static void ReadClient(const Comm::ConnectionPointer &, char *buf, >> size_t len, comm_err_t errcode, int xerrno, void *data); >> + static void ReadClient(const Comm::ConnectionPointer &, char *, size_t >> len, comm_err_t errcode, int xerrno, void *data); > > Please avoid this and similar non-changes to reduce merge conflicts. It > is OK to remove the parameter name in the method _definition_ where it > was used and is no longer needed(**), but there is no need to update > methods that did not use the parameter or the declarations. These > methods should be refactored/polished when TunnelStateData becomes a > job, but that is outside your patch scope. > > (**) although the existing code like TunnelStateData::readServer() > implies that such changes are not required either. > Fine. > >> - void bytesIn(int const &); >> -#if USE_DELAY_POOLS >> - >> - void setDelayId(DelayId const &); >> -#endif >> - >> void error(int const xerrno); >> int debugLevelForError(int const xerrno) const; >> /// handles a non-I/O error associated with this Connection >> void logicError(const char *errMsg); >> void closeIfOpen(); >> void dataSent (size_t amount); > ... >> int64_t *size_ptr; /* pointer to size in an ConnStateData >> for logging */ >> >> Comm::ConnectionPointer conn; ///< The currently connected >> connection. >> >> - private: >> #if USE_DELAY_POOLS >> - >> DelayId delayId; >> #endif >> - > >> - server.bytesIn(len); >> +#if USE_DELAY_POOLS >> + server.delayId.bytesIn(len); >> +#endif > >> - tunnelState->server.setDelayId(DelayId()); >> + tunnelState->server.delayId = DelayId(); > > Please avoid these and similar changes if they are unrelated to your > patch scope. > With the removal of client.len/server.len byte accounting the delayId.bytesIn() is now used directly for delay pool byte accounting. Which makes the delayId wrappers of all types needless. > >> if (rep.hdr_sz < connectRespBuf->contentSize()) { >> // preserve bytes that the server already sent after the CONNECT >> response >> - server.len = connectRespBuf->contentSize() - rep.hdr_sz; >> - memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, >> server.len); >> - } else { >> - // reset; delay pools were using this field to throttle CONNECT >> response >> - server.len = 0; >> + server.buf.append(connectRespBuf->content()+rep.hdr_sz, >> connectRespBuf->contentSize() - rep.hdr_sz); >> } > > Can you explain why the "else" clause is no longer needed? Or was it > bogus to start with? > It was accumulating value due to server.bytesIn(len) being used as access to delayId_ from readConnectResponseDone(). We now use server.delayId.bytesIn(len) in both the read callbacks and the else case became bogus. > >> - if (!tunnelState->server.len) { >> + if (tunnelState->server.buf.isEmpty()) { > > and > >> - if (!tunnelState->client.len) { >> + if (tunnelState->client.buf.isEmpty()) { > > but > >> - if (!tunnelState->server.len) >> + if (!tunnelState->server.buf.length()) > > Please be consistent: isEmpty() is probably slightly better than > !length(), but it is your call which method to use. > Fixed. > > >> - debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << >> Raw("", tunnelState->server.buf, tunnelState->server.len) << "\n----------"); >> + debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << >> tunnelState->server.buf << "\n----------"); > > Please do not remove Raw() protection for dumping raw data. > SBuf provides generic operator<<(std::ostream&) for debugs. If Raw() protection is required on the buffer it needs to be provided by that. There are a number of places dumping SBuf contents to debugs() already. Will do as a followup if necessary. Amos
=== modified file 'src/comm/Write.cc' --- src/comm/Write.cc 2014-02-21 10:46:19 +0000 +++ src/comm/Write.cc 2014-05-13 08:57:09 +0000 @@ -1,118 +1,145 @@ #include "squid.h" #include "comm/Connection.h" #include "comm/IoCallback.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "MemBuf.h" #include "profiler/Profiler.h" +#include "SBuf.h" #include "SquidTime.h" #include "StatCounters.h" #if USE_DELAY_POOLS #include "ClientInfo.h" #endif #include <cerrno> void +Comm::Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer &callback) +{ + debugs(5, 5, conn << ": sz " << sb->length() << ": asynCall " << callback); + + /* Make sure we are open, not closing, and not writing */ + assert(fd_table[conn->fd].flags.open); + assert(!fd_table[conn->fd].closing()); + Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); + assert(!ccb->active()); + + fd_table[conn->fd].writeStart = squid_curtime; + ccb->conn = conn; + /* Queue the write */ + ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb->length()); + ccb->buf2 = sb; + ccb->selectOrQueueWrite(); +} + +/// \deprecated use SBuf for I/O buffer instead +void Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) { Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc()); } +/// \deprecated use SBuf for I/O buffer instead void Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) { debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback); /* Make sure we are open, not closing, and not writing */ assert(fd_table[conn->fd].flags.open); assert(!fd_table[conn->fd].closing()); Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); assert(!ccb->active()); fd_table[conn->fd].writeStart = squid_curtime; ccb->conn = conn; /* Queue the write */ ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); ccb->selectOrQueueWrite(); } /** Write to FD. * This function is used by the lowest level of IO loop which only has access to FD numbers. * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections. * Once the write has been concluded we schedule the waiting call with success/fail results. */ void Comm::HandleWrite(int fd, void *data) { Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data); int len = 0; int nleft; assert(state->conn != NULL && state->conn->fd == fd); PROF_start(commHandleWrite); - debugs(5, 5, HERE << state->conn << ": off " << - (long int) state->offset << ", sz " << (long int) state->size << "."); - nleft = state->size - state->offset; + debugs(5, 5, state->conn << ": off " << state->offset << ", sz " << state->size); + if (state->buf2) + nleft = state->buf2->length(); + else + nleft = state->size - state->offset; #if USE_DELAY_POOLS ClientInfo * clientInfo=fd_table[fd].clientInfo; if (clientInfo && !clientInfo->writeLimitingActive) clientInfo = NULL; // we only care about quota limits here if (clientInfo) { assert(clientInfo->selectWaiting); clientInfo->selectWaiting = false; assert(clientInfo->hasQueue()); assert(clientInfo->quotaPeekFd() == fd); clientInfo->quotaDequeue(); // we will write or requeue below if (nleft > 0) { const int quota = clientInfo->quotaForDequed(); if (!quota) { // if no write quota left, queue this fd state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); clientInfo->kickQuotaQueue(); PROF_stop(commHandleWrite); return; } const int nleft_corrected = min(nleft, quota); if (nleft != nleft_corrected) { debugs(5, 5, HERE << state->conn << " writes only " << nleft_corrected << " out of " << nleft); nleft = nleft_corrected; } } } #endif /* USE_DELAY_POOLS */ /* actually WRITE data */ - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); - debugs(5, 5, HERE << "write() returns " << len); + if (state->buf2) + len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft); + else + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + debugs(5, 5, "write() returns " << len); #if USE_DELAY_POOLS if (clientInfo) { if (len > 0) { /* we wrote data - drain them from bucket */ clientInfo->bucketSize -= len; if (clientInfo->bucketSize < 0.0) { debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen clientInfo->bucketSize = 0; } } // even if we wrote nothing, we were served; give others a chance clientInfo->kickQuotaQueue(); } #endif /* USE_DELAY_POOLS */ fd_bytes(fd, len, FD_WRITE); ++statCounter.syscalls.sock.writes; // After each successful partial write, @@ -125,30 +152,36 @@ if (nleft != 0) debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (len < 0) { /* An error */ if (fd_table[fd].flags.socket_eof) { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } else if (ignoreErrno(errno)) { debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->selectOrQueueWrite(); } else { debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); state->finish(nleft ? COMM_ERROR : COMM_OK, errno); } } else { /* A successful write, continue */ state->offset += len; + if (state->buf2) { + state->buf2->consume(len); + if (state->buf2->isEmpty()) + state->finish(nleft ? COMM_OK : COMM_ERROR, errno); + } + if (state->offset < state->size) { /* Not done, reinstall the write handler and write some more */ state->selectOrQueueWrite(); } else { state->finish(nleft ? COMM_OK : COMM_ERROR, errno); } } PROF_stop(commHandleWrite); } === modified file 'src/comm/Write.h' --- src/comm/Write.h 2012-08-14 11:53:07 +0000 +++ src/comm/Write.h 2014-05-13 08:56:43 +0000 @@ -1,34 +1,44 @@ #ifndef _SQUID_COMM_IOWRITE_H #define _SQUID_COMM_IOWRITE_H #include "base/AsyncCall.h" #include "comm/forward.h" #include "typedefs.h" class MemBuf; +class SBuf; + namespace Comm { /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. * * free_func is used to free the passed buffer when the write has completed. */ void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); /** * Queue a write. callback is scheduled when the write * completes, on error, or on file descriptor close. */ void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback); +/** + * Queue a write for an SBuf contents. The SBuf content is consume()'d as it is written. + * callback is scheduled when the SBuf is emptied, on error, or on file descriptor close. + * If the SBuf is appended to while write(2) are ongoing some additional bytes may also be + * written before the callback is scheduled. + */ +void Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer &callback); + /// Cancel the write pending on FD. No action if none pending. void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason); // callback handler to process an FD which is available for writing. extern PF HandleWrite; } // namespace Comm #endif /* _SQUID_COMM_IOWRITE_H */ === modified file 'src/neighbors.cc' --- src/neighbors.cc 2014-04-30 10:50:09 +0000 +++ src/neighbors.cc 2014-05-12 12:55:36 +0000 @@ -151,50 +151,50 @@ return p->type; } /** * \return Whether it is appropriate to fetch REQUEST from PEER. */ bool peerAllowedToUse(const CachePeer * p, HttpRequest * request) { const CachePeerDomainList *d = NULL; assert(request != NULL); if (neighborType(p, request) == PEER_SIBLING) { #if PEER_MULTICAST_SIBLINGS if (p->type == PEER_MULTICAST && p->options.mcast_siblings && (request->flags.noCache || request->flags.refresh || request->flags.loopDetected || request->flags.needValidation)) debugs(15, 2, "peerAllowedToUse(" << p->name << ", " << request->GetHost() << ") : multicast-siblings optimization match"); #endif - if (request->flags.noCache) + if (request->flags.loopDetected) return false; - if (request->flags.refresh) + if (request->flags.noCache) return false; - if (request->flags.loopDetected) + if (request->flags.refresh && p->options.allow_miss) return false; - if (request->flags.needValidation) + if (request->flags.needValidation && p->options.allow_miss) return false; } // CONNECT requests are proxy requests. Not to be forwarded to origin servers. // Unless the destination port matches, in which case we MAY perform a 'DIRECT' to this CachePeer. if (p->options.originserver && request->method == Http::METHOD_CONNECT && request->port != p->in_addr.port()) return false; if (p->peer_domain == NULL && p->access == NULL) return true; bool do_ping = false; for (d = p->peer_domain; d; d = d->next) { if (0 == matchDomainName(request->GetHost(), d->domain)) { do_ping = d->do_ping; break; } do_ping = !d->do_ping; } === modified file 'src/tunnel.cc' --- src/tunnel.cc 2014-05-07 14:40:05 +0000 +++ src/tunnel.cc 2014-05-13 08:59:13 +0000 @@ -108,69 +108,57 @@ return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost()); }; /// Whether we are writing a CONNECT request to a peer. bool waitingForConnectRequest() const { return connectReqWriting; } /// Whether we are reading a CONNECT response from a peer. bool waitingForConnectResponse() const { return connectRespBuf; } /// Whether we are waiting for the CONNECT request/response exchange with the peer. bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); } /// Whether the client sent a CONNECT request to us. bool clientExpectsConnectResponse() const { return !(request != NULL && (request->flags.interceptTproxy || request->flags.intercepted)); } class Connection { public: - Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {} - - ~Connection(); + Connection() : size_ptr(NULL) {} int bytesWanted(int lower=0, int upper = INT_MAX) const; - void bytesIn(int const &); -#if USE_DELAY_POOLS - - void setDelayId(DelayId const &); -#endif - void error(int const xerrno); int debugLevelForError(int const xerrno) const; /// handles a non-I/O error associated with this Connection void logicError(const char *errMsg); void closeIfOpen(); void dataSent (size_t amount); - int len; - char *buf; + SBuf buf; int64_t *size_ptr; /* pointer to size in an ConnStateData for logging */ Comm::ConnectionPointer conn; ///< The currently connected connection. - private: #if USE_DELAY_POOLS - DelayId delayId; #endif - }; Connection client, server; int *status_ptr; /* pointer to status for logging */ MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer void copyRead(Connection &from, IOCB *completion); /// continue to set up connection to a peer, going async for SSL peers void connectToPeer(); private: #if USE_OPENSSL /// Gives PeerConnector access to Answer in the TunnelStateData callback dialer. class MyAnswerDialer: public CallDialer, public Ssl::PeerConnector::CbDialer { public: typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &); @@ -181,208 +169,196 @@ virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); } void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); } virtual void print(std::ostream &os) const { os << '(' << tunnel_.get() << ", " << answer_ << ')'; } /* Ssl::PeerConnector::CbDialer API */ virtual Ssl::PeerConnectorAnswer &answer() { return answer_; } private: Method method_; CbcPointer<TunnelStateData> tunnel_; Ssl::PeerConnectorAnswer answer_; }; void connectedToPeer(Ssl::PeerConnectorAnswer &answer); #endif CBDATA_CLASS2(TunnelStateData); bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to); - void copy(size_t len, Connection &from, Connection &to, IOCB *); + void copy(Connection &from, Connection &to, IOCB *); void handleConnectResponse(const size_t chunkSize); - void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); - void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); - void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); - void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + void readServer(size_t len, comm_err_t errcode, int xerrno); + void readClient(size_t len, comm_err_t errcode, int xerrno); + void writeClientDone(size_t len, comm_err_t flag, int xerrno); + void writeServerDone(size_t len, comm_err_t flag, int xerrno); static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno); }; static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; static CLCB tunnelServerClosed; static CLCB tunnelClientClosed; static CTCB tunnelTimeout; static PSC tunnelPeerSelectComplete; static void tunnelConnected(const Comm::ConnectionPointer &server, void *); static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *); static void tunnelServerClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->server.conn); tunnelState->server.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->server.len) { + if (tunnelState->server.buf.isEmpty()) { tunnelState->client.conn->close(); return; } } static void tunnelClientClosed(const CommCloseCbParams ¶ms) { TunnelStateData *tunnelState = (TunnelStateData *)params.data; debugs(26, 3, HERE << tunnelState->client.conn); tunnelState->client.conn = NULL; if (tunnelState->noConnections()) { delete tunnelState; return; } - if (!tunnelState->client.len) { + if (tunnelState->client.buf.isEmpty()) { tunnelState->server.conn->close(); return; } } TunnelStateData::TunnelStateData() : url(NULL), http(), request(NULL), status_ptr(NULL), connectRespBuf(NULL), connectReqWriting(false) { debugs(26, 3, "TunnelStateData constructed this=" << this); } TunnelStateData::~TunnelStateData() { debugs(26, 3, "TunnelStateData destructed this=" << this); assert(noConnections()); xfree(url); serverDestinations.clear(); delete connectRespBuf; } -TunnelStateData::Connection::~Connection() -{ - safe_free(buf); -} - int TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const { #if USE_DELAY_POOLS return delayId.bytesWanted(lowerbound, upperbound); #else return upperbound; #endif } -void -TunnelStateData::Connection::bytesIn(int const &count) -{ - debugs(26, 3, HERE << "len=" << len << " + count=" << count); -#if USE_DELAY_POOLS - delayId.bytesIn(count); -#endif - - len += count; -} - int TunnelStateData::Connection::debugLevelForError(int const xerrno) const { #ifdef ECONNRESET if (xerrno == ECONNRESET) return 2; #endif if (ignoreErrno(xerrno)) return 3; return 1; } /* Read from server side and queue it for writing to the client */ void TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert(cbdataReferenceValid(tunnelState)); debugs(26, 3, HERE << c); - tunnelState->readServer(buf, len, errcode, xerrno); + tunnelState->readServer(len, errcode, xerrno); } void -TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) - copy(len, server, client, WriteClientDone); + copy(server, client, WriteClientDone); } /// Called when we read [a part of] CONNECT response from the peer void TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode); assert(waitingForConnectResponse()); if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { connectRespBuf->appended(len); - server.bytesIn(len); +#if USE_DELAY_POOLS + server.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.server.all.kbytes_in), len); kb_incr(&(statCounter.server.other.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, server, client)) handleConnectResponse(len); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); tunnelState->readConnectResponseDone(buf, len, errcode, xerrno); } /// Parses [possibly incomplete] CONNECT response and reacts to it. /// If the tunnel is being closed or more response data is needed, returns false. @@ -417,169 +393,167 @@ server.logicError("huge CONNECT response from peer"); return; } // keep reading readConnectResponse(); return; } // CONNECT response was successfully parsed *status_ptr = rep.sline.status(); // bail if we did not get an HTTP 200 (Connection Established) response if (rep.sline.status() != Http::scOkay) { server.logicError("unsupported CONNECT response status code"); return; } if (rep.hdr_sz < connectRespBuf->contentSize()) { // preserve bytes that the server already sent after the CONNECT response - server.len = connectRespBuf->contentSize() - rep.hdr_sz; - memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len); - } else { - // reset; delay pools were using this field to throttle CONNECT response - server.len = 0; + server.buf.append(connectRespBuf->content()+rep.hdr_sz, connectRespBuf->contentSize() - rep.hdr_sz); } delete connectRespBuf; connectRespBuf = NULL; connectExchangeCheckpoint(); } void TunnelStateData::Connection::logicError(const char *errMsg) { debugs(50, 3, conn << " closing on error: " << errMsg); conn->close(); } void TunnelStateData::Connection::error(int const xerrno) { /* XXX fixme xstrerror and xerrno... */ errno = xerrno; debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror()); if (!ignoreErrno(xerrno)) conn->close(); } /* Read from client side and queue it for writing to the server */ void TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->readClient(buf, len, errcode, xerrno); + tunnelState->readClient(len, errcode, xerrno); } void -TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno) { debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode); /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ if (errcode == COMM_ERR_CLOSING) return; if (len > 0) { - client.bytesIn(len); +#if USE_DELAY_POOLS + client.delayId.bytesIn(len); +#endif kb_incr(&(statCounter.client_http.kbytes_in), len); } if (keepGoingAfterRead(len, errcode, xerrno, client, server)) - copy(len, client, server, WriteServerDone); + copy(client, server, WriteServerDone); } /// Updates state after reading from client or server. /// Returns whether the caller should use the data just read. bool TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to) { debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}"); /* I think this is to prevent free-while-in-a-callback behaviour * - RBC 20030229 * from.conn->close() / to.conn->close() done here trigger close callbacks which may free TunnelStateData */ const CbcPointer<TunnelStateData> safetyLock(this); /* Bump the source connection read timeout on any activity */ if (Comm::IsConnOpen(from.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall); } /* Bump the dest connection read timeout on any activity */ /* see Bug 3659: tunnels can be weird, with very long one-way transfers */ if (Comm::IsConnOpen(to.conn)) { AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, this)); commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall); } if (errcode) from.error (xerrno); else if (len == 0 || !Comm::IsConnOpen(to.conn)) { debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the tunnel."); from.conn->close(); /* Only close the remote end if we've finished queueing data to it */ - if (from.len == 0 && Comm::IsConnOpen(to.conn) ) { + if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn)) { to.conn->close(); } } else if (cbdataReferenceValid(this)) { return true; } return false; } void -TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion) +TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion) { debugs(26, 3, HERE << "Schedule Write"); AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler", CommIoCbPtrFun(completion, this)); - Comm::Write(to.conn, from.buf, len, call, NULL); + Comm::Write(to.conn, &from.buf, call); } /* Writes data from the client buffer to the server side */ void TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeServerDone(buf, len, flag, xerrno); + tunnelState->writeServerDone(len, flag, xerrno); } void -TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << server.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "calling TunnelStateData::server.error(" << xerrno <<")"); server.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "No read input. Closing server connection."); server.conn->close(); return; } /* Valid data */ @@ -590,57 +564,55 @@ /* If the other end has closed, so should we */ if (!Comm::IsConnOpen(client.conn)) { debugs(26, 4, HERE << "Client gone away. Shutting down server connection."); server.conn->close(); return; } const CbcPointer<TunnelStateData> safetyLock(this); /* ??? should be locked by the caller... */ if (cbdataReferenceValid(this)) copyRead(client, ReadClient); } /* Writes data from the server buffer to the client side */ void TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; assert (cbdataReferenceValid (tunnelState)); - tunnelState->writeClientDone(buf, len, flag, xerrno); + tunnelState->writeClientDone(len, flag, xerrno); } void TunnelStateData::Connection::dataSent(size_t amount) { - debugs(26, 3, HERE << "len=" << len << " - amount=" << amount); - assert(amount == (size_t)len); - len =0; - /* increment total object size */ + debugs(26, 3, amount << " bytes written"); + /* increment total object size */ if (size_ptr) *size_ptr += amount; } void -TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno) { debugs(26, 3, HERE << client.conn << ", " << len << " bytes written, flag=" << flag); /* Error? */ if (flag != COMM_OK) { if (flag != COMM_ERR_CLOSING) { debugs(26, 4, HERE << "Closing client connection due to comm flags."); client.error(xerrno); // may call comm_close } return; } /* EOF? */ if (len == 0) { debugs(26, 4, HERE << "Closing client connection due to 0 byte read."); client.conn->close(); return; } /* Valid data */ @@ -665,86 +637,86 @@ { TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data); debugs(26, 3, HERE << io.conn); /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */ CbcPointer<TunnelStateData> safetyLock(tunnelState); tunnelState->client.closeIfOpen(); tunnelState->server.closeIfOpen(); } void TunnelStateData::Connection::closeIfOpen() { if (Comm::IsConnOpen(conn)) conn->close(); } void TunnelStateData::copyRead(Connection &from, IOCB *completion) { - assert(from.len == 0); AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler", CommIoCbPtrFun(completion, this)); - comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); + from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF)); + comm_read(from.conn, from.buf, call); } void TunnelStateData::readConnectResponse() { assert(waitingForConnectResponse()); AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone", CommIoCbPtrFun(ReadConnectResponseDone, this)); comm_read(server.conn, connectRespBuf->space(), server.bytesWanted(1, connectRespBuf->spaceSize()), call); } /** * Set the HTTP status for this request and sets the read handlers for client * and server side connections. */ static void tunnelStartShoveling(TunnelStateData *tunnelState) { assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (cbdataReferenceValid(tunnelState)) { // Shovel any payload already pushed into reply buffer by the server response - if (!tunnelState->server.len) + if (tunnelState->server.buf.isEmpty()) tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); else { - debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("", tunnelState->server.buf, tunnelState->server.len) << "\n----------"); - tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); + debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << tunnelState->server.buf << "\n----------"); + tunnelState->copy(tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); } // Bug 3371: shovel any payload already pushed into ConnStateData by the client request if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->in.buf.isEmpty()) { struct ConnStateData::In *in = &tunnelState->http->getConn()->in; debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf << "\n----------"); // We just need to ensure the bytes from ConnStateData are in client.buf already to deliver - memcpy(tunnelState->client.buf, in->buf.rawContent(), in->buf.length()); - // NP: readClient() takes care of buffer length accounting. - tunnelState->readClient(tunnelState->client.buf, in->buf.length(), COMM_OK, 0); + tunnelState->client.buf = in->buf; in->buf.consume(); // ConnStateData buffer accounting after the shuffle. + // NP: readClient() takes care of buffer length accounting. + tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK, 0); } else tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); } } /** * All the pieces we need to write to client and/or server connection * have been written. * Call the tunnelStartShoveling to start the blind pump. */ static void tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << conn << ", flag=" << flag); if (flag != COMM_OK) { *tunnelState->status_ptr = Http::scInternalServerError; tunnelErrorComplete(conn->fd, data, 0); return; @@ -838,41 +810,41 @@ Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } else { debugs(26, 4, HERE << "terminate with error."); ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw()); *tunnelState->status_ptr = Http::scServiceUnavailable; err->xerrno = xerrno; // on timeout is this still: err->xerrno = ETIMEDOUT; err->port = conn->remote.port(); err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err); } return; } #if USE_DELAY_POOLS /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */ if (conn->getPeer() && conn->getPeer()->options.no_delay) - tunnelState->server.setDelayId(DelayId()); + tunnelState->server.delayId = DelayId(); #endif tunnelState->request->hier.note(conn, tunnelState->getHost()); tunnelState->server.conn = conn; tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host : NULL; comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState); debugs(26, 4, HERE << "determine post-connect handling pathway."); if (conn->getPeer()) { tunnelState->request->peer_login = conn->getPeer()->login; tunnelState->request->flags.proxying = !(conn->getPeer()->options.originserver); } else { tunnelState->request->peer_login = NULL; tunnelState->request->flags.proxying = false; } if (tunnelState->request->flags.proxying) tunnelState->connectToPeer(); else { @@ -906,41 +878,41 @@ * default is to allow. */ ACLFilledChecklist ch(Config.accessList.miss, request, NULL); ch.src_addr = request->client_addr; ch.my_addr = request->my_addr; if (ch.fastCheck() == ACCESS_DENIED) { debugs(26, 4, HERE << "MISS access forbidden."); err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, request); *status_ptr = Http::scForbidden; errorSend(http->getConn()->clientConnection, err); return; } } debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver); ++statCounter.server.all.requests; ++statCounter.server.other.requests; tunnelState = new TunnelStateData; #if USE_DELAY_POOLS - tunnelState->server.setDelayId(DelayId::DelayClient(http)); + tunnelState->server.delayId = DelayId::DelayClient(http); #endif tunnelState->url = xstrdup(url); tunnelState->request = request; tunnelState->server.size_ptr = size_ptr; tunnelState->status_ptr = status_ptr; tunnelState->client.conn = http->getConn()->clientConnection; tunnelState->http = http; tunnelState->al = al; comm_add_close_handler(tunnelState->client.conn->fd, tunnelClientClosed, tunnelState); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall); peerSelect(&(tunnelState->serverDestinations), request, al, NULL, tunnelPeerSelectComplete, @@ -1063,29 +1035,20 @@ delete err; GetMarkingsToServer(tunnelState->request.getRaw(), *tunnelState->serverDestinations[0]); debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" << tunnelState->serverDestinations[0] << "}"); AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } CBDATA_CLASS_INIT(TunnelStateData); bool TunnelStateData::noConnections() const { return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn); } - -#if USE_DELAY_POOLS -void -TunnelStateData::Connection::setDelayId(DelayId const &newDelay) -{ - delayId = newDelay; -} - -#endif