On 13/05/2014 10:46 a.m., Alex Rousskov wrote:
> On 05/11/2014 11:16 AM, Amos Jeffries wrote:
> 
>> This patch adds a Comm::Write API for accepting SBuf output buffers.
> 
> 
>> Unlike the char* API the buffer can be appended to before write(2) is
>> completed and the appended bytes will be handled as part of the original
>> write request.
> 
> That approach scares me because the caller has to be extra careful not
> to leave the being-updated-area of the buffer in inconsistent state
> across async calls. However, I doubt we have such complex callers today,
> and the approach also reduces the number of usually pointless write
> callback calls, so I am willing to experiment.

If you have a better approach I would like to hear it.

The only one I can think of is possibly Ref-Counting the SBuf instance
itself. Adding that capability though seems to be implying that
SBuf::Pointer in general is better than just passing SBuf references
around (its not).


The raw-pointer usage which appears to be scaring you is the existing
status quo anyway...

 * char* API passes a raw pointer to the store/buffer. Any alterations
of the content are racing the actual write(2) call for those bytes,
appends are lost on callback, and truncate are not seen by the write(2).
Corrupted output one way or another.
  => requirement that the caller do nothing other than allocate a new
buffer while write is scheduled. Buffer may be re-used after the callback.

 * MemBuf API passes a raw pointer to its _internal_ data store/buffer.
It goes further by passing the store deallocator. Any alterations of the
content are racing the actual write(2) call for those bytes, appends are
either lost on callback or reallocate the backing store for the caller,
and truncate are not seen by the write(2). All operations are
potentially done on a buffer pointer already deallocated by comm.
  => requirement that the caller do nothing other than MemBuf::init()
while write is scheduled (ie allocate a new buffer). Same condition
applies even after callback (ouch).

 * SBuf API passes a reference/raw-pointer to the smart pointer SBuf
instance. All operations on the MemBlob backing store remain valid.
  => requirement that the caller do nothing to the SBuf instance other
than allocate a new one while write is scheduled.
   ==> Note this is the same requirement as made on both other API
already. But no longer restricted on what can be done with the backing
store by the caller. ie it can continue to use the SBuf it has for
regular SBuf operations.


Since these SBuf are members of a caller Job (or equivalent state
object) instead of a dynamically allocated/deallocated SBuf object that
seems a reasonable requirement.


> 
>> All other behaviour is identical between the three Comm::Write APIs.
> 
> You forgot to mention the fact that the new Write API, unlike the
> existing two, relies on the caller to keep the buffer area alive. That
> requirement, IMO, is a show-stopper. I strongly object to that change.

If by "buffer area" you mean MemBlob that is not a requirement. It can
have anything at all done to it during Comm::Write. If it is cleared the
write will probably return 0 bytes written to the callback. Most
callbacks assume that means the connection has terminated unexpectedly.
So its not advisable, but possible without any internal problems to comm.

If by "buffer area" you means SBuf. See above comparison with the other
two APIs.

I think that is reasonable to retain that requirement. Especially since
it only applies on the small SBuf instance.
 One either sends an SBuf which is a member of the Job/*StateData object
itself or an SBuf member of the Params object which is delivered to the
callback. Both methods ensure its alive until at least the callback is
scheduled.


If/when we start delivering lists of small SBuf objects that is a whole
other API needed for SBufList. OR perhapse a Job loop writing each SBuf
in the list individually ... with the Job doing this keeping the
relevant SBuf instance alive.

> 
> The buf2 hack has already been added during unaudited "Convert the
> ConnStateData input buffer to SBuf" commit (r13324), so there is little
> I can do about it until it causes a bug, but I do object to the use of
> such raw/unprotected links between Comm and its callers in any code I am
> lucky to review before the commit.
> 
> If others overrule this objection, please at least change the
> Comm::Write() sb argument type from reference to a pointer. Passing an
> argument by reference (implying temporary use) and then taking an
> address of that argument for long term object storage is wrong.

Fine.

> 
>>     /// Buffer to store read(2) into when set.
>>     // This is a pointer to the Jobs buffer rather than an SBuf using
>>     // the same store since we cannot know when or how the Job will
>>     // alter its SBuf while we are reading.
>>     SBuf *buf2;
>>
>>     // Legacy c-string buffers used when buf2 is unset.
>>     char *buf;
> 
> 
> Another side-effect is that "I wrote this, what's next?" notifications
> may now be delayed for a very long time (potentially forever) if the
> caller keeps adding content to the buffer. I suspect that most writers
> do not really need such notifications. Thank you for documenting this
> aspect (although it deserves to be mentioned in the commit message as
> well IMO).


There is no code doing this behaviour at present. I trust that we have
enough quality on new code now to ensure that its only taken advantage
of when appropriate.

For now the (offset > size) check for re-scheduling is still in effect.
It is just that up to TCP buffer amount more bytes can be written if
they are appended early. The callback will be delivered the accurate
written amount and teh buffer consumed() correctly either way.

I've changed the "will" to "may" to avoid implying that the indefinite
wait/hang situation.

In this round I have also added a check to ensure that caller using
consume() does not cause hanging. But there are worse logic problems in
callers doing that inappropriately.


> 
>> -    static void ReadClient(const Comm::ConnectionPointer &, char *buf, 
>> size_t len, comm_err_t errcode, int xerrno, void *data);
>> +    static void ReadClient(const Comm::ConnectionPointer &, char *, size_t 
>> len, comm_err_t errcode, int xerrno, void *data);
> 
> Please avoid this and similar non-changes to reduce merge conflicts. It
> is OK to remove the parameter name in the method _definition_ where it
> was used and is no longer needed(**), but there is no need to update
> methods that did not use the parameter or the declarations. These
> methods should be refactored/polished when TunnelStateData becomes a
> job, but that is outside your patch scope.
> 
> (**) although the existing code like TunnelStateData::readServer()
> implies that such changes are not required either.
> 

Fine.

> 
>> -        void bytesIn(int const &);
>> -#if USE_DELAY_POOLS
>> -
>> -        void setDelayId(DelayId const &);
>> -#endif
>> -
>>          void error(int const xerrno);
>>          int debugLevelForError(int const xerrno) const;
>>          /// handles a non-I/O error associated with this Connection
>>          void logicError(const char *errMsg);
>>          void closeIfOpen();
>>          void dataSent (size_t amount);
> ...
>>          int64_t *size_ptr;          /* pointer to size in an ConnStateData 
>> for logging */
>>  
>>          Comm::ConnectionPointer conn;    ///< The currently connected 
>> connection.
>>  
>> -    private:
>>  #if USE_DELAY_POOLS
>> -
>>          DelayId delayId;
>>  #endif
>> -
> 
>> -        server.bytesIn(len);
>> +#if USE_DELAY_POOLS
>> +        server.delayId.bytesIn(len);
>> +#endif
> 
>> -        tunnelState->server.setDelayId(DelayId());
>> +        tunnelState->server.delayId = DelayId();
> 
> Please avoid these and similar changes if they are unrelated to your
> patch scope.
> 

With the removal of client.len/server.len byte accounting the
delayId.bytesIn() is now used directly for delay pool byte accounting.
Which makes the delayId wrappers of all types needless.

> 
>>      if (rep.hdr_sz < connectRespBuf->contentSize()) {
>>          // preserve bytes that the server already sent after the CONNECT 
>> response
>> -        server.len = connectRespBuf->contentSize() - rep.hdr_sz;
>> -        memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, 
>> server.len);
>> -    } else {
>> -        // reset; delay pools were using this field to throttle CONNECT 
>> response
>> -        server.len = 0;
>> +        server.buf.append(connectRespBuf->content()+rep.hdr_sz, 
>> connectRespBuf->contentSize() - rep.hdr_sz);
>>      }
> 
> Can you explain why the "else" clause is no longer needed? Or was it
> bogus to start with?
> 

It was accumulating value due to server.bytesIn(len) being used as
access to delayId_ from readConnectResponseDone().

We now use server.delayId.bytesIn(len) in both the read callbacks and
the else case became bogus.

> 
>> -    if (!tunnelState->server.len) {
>> +    if (tunnelState->server.buf.isEmpty()) {
> 
> and
> 
>> -    if (!tunnelState->client.len) {
>> +    if (tunnelState->client.buf.isEmpty()) {
> 
> but
> 
>> -        if (!tunnelState->server.len)
>> +        if (!tunnelState->server.buf.length())
> 
> Please be consistent: isEmpty() is probably slightly better than
> !length(), but it is your call which method to use.
> 

Fixed.

> 
> 
>> -            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << 
>> Raw("", tunnelState->server.buf, tunnelState->server.len) << "\n----------");
>> +            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << 
>> tunnelState->server.buf << "\n----------");
> 
> Please do not remove Raw() protection for dumping raw data.
> 

SBuf provides generic operator<<(std::ostream&) for debugs.

If Raw() protection is required on the buffer it needs to be provided by
that. There are a number of places dumping SBuf contents to debugs()
already. Will do as a followup if necessary.

Amos
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc   2014-02-21 10:46:19 +0000
+++ src/comm/Write.cc   2014-05-13 08:57:09 +0000
@@ -1,118 +1,145 @@
 #include "squid.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "MemBuf.h"
 #include "profiler/Profiler.h"
+#include "SBuf.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 
 #include <cerrno>
 
 void
+Comm::Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer 
&callback)
+{
+    debugs(5, 5, conn << ": sz " << sb->length() << ": asynCall " << callback);
+
+    /* Make sure we are open, not closing, and not writing */
+    assert(fd_table[conn->fd].flags.open);
+    assert(!fd_table[conn->fd].closing());
+    Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+    assert(!ccb->active());
+
+    fd_table[conn->fd].writeStart = squid_curtime;
+    ccb->conn = conn;
+    /* Queue the write */
+    ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, sb->length());
+    ccb->buf2 = sb;
+    ccb->selectOrQueueWrite();
+}
+
+/// \deprecated use SBuf for I/O buffer instead
+void
 Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, 
AsyncCall::Pointer &callback)
 {
     Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
+/// \deprecated use SBuf for I/O buffer instead
 void
 Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, 
AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(fd_table[conn->fd].flags.open);
     assert(!fd_table[conn->fd].closing());
     Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
     assert(!ccb->active());
 
     fd_table[conn->fd].writeStart = squid_curtime;
     ccb->conn = conn;
     /* Queue the write */
     ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
     ccb->selectOrQueueWrite();
 }
 
 /** Write to FD.
  * This function is used by the lowest level of IO loop which only has access 
to FD numbers.
  * We have to use the comm iocb_table to map FD numbers to waiting data and 
Comm::Connections.
  * Once the write has been concluded we schedule the waiting call with 
success/fail results.
  */
 void
 Comm::HandleWrite(int fd, void *data)
 {
     Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
     int len = 0;
     int nleft;
 
     assert(state->conn != NULL && state->conn->fd == fd);
 
     PROF_start(commHandleWrite);
-    debugs(5, 5, HERE << state->conn << ": off " <<
-           (long int) state->offset << ", sz " << (long int) state->size << 
".");
 
-    nleft = state->size - state->offset;
+    debugs(5, 5, state->conn << ": off " << state->offset << ", sz " << 
state->size);
+    if (state->buf2)
+        nleft = state->buf2->length();
+    else
+        nleft = state->size - state->offset;
 
 #if USE_DELAY_POOLS
     ClientInfo * clientInfo=fd_table[fd].clientInfo;
 
     if (clientInfo && !clientInfo->writeLimitingActive)
         clientInfo = NULL; // we only care about quota limits here
 
     if (clientInfo) {
         assert(clientInfo->selectWaiting);
         clientInfo->selectWaiting = false;
 
         assert(clientInfo->hasQueue());
         assert(clientInfo->quotaPeekFd() == fd);
         clientInfo->quotaDequeue(); // we will write or requeue below
 
         if (nleft > 0) {
             const int quota = clientInfo->quotaForDequed();
             if (!quota) {  // if no write quota left, queue this fd
                 state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
                 clientInfo->kickQuotaQueue();
                 PROF_stop(commHandleWrite);
                 return;
             }
 
             const int nleft_corrected = min(nleft, quota);
             if (nleft != nleft_corrected) {
                 debugs(5, 5, HERE << state->conn << " writes only " <<
                        nleft_corrected << " out of " << nleft);
                 nleft = nleft_corrected;
             }
 
         }
     }
 #endif /* USE_DELAY_POOLS */
 
     /* actually WRITE data */
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
-    debugs(5, 5, HERE << "write() returns " << len);
+    if (state->buf2)
+        len = FD_WRITE_METHOD(fd, state->buf2->rawContent(), nleft);
+    else
+        len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    debugs(5, 5, "write() returns " << len);
 
 #if USE_DELAY_POOLS
     if (clientInfo) {
         if (len > 0) {
             /* we wrote data - drain them from bucket */
             clientInfo->bucketSize -= len;
             if (clientInfo->bucketSize < 0.0) {
                 debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // 
should not happen
                 clientInfo->bucketSize = 0;
             }
         }
 
         // even if we wrote nothing, we were served; give others a chance
         clientInfo->kickQuotaQueue();
     }
 #endif /* USE_DELAY_POOLS */
 
     fd_bytes(fd, len, FD_WRITE);
     ++statCounter.syscalls.sock.writes;
     // After each successful partial write,
@@ -125,30 +152,36 @@
         if (nleft != 0)
             debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: 
connection closed with " << nleft << " bytes remaining.");
 
         state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
     } else if (len < 0) {
         /* An error */
         if (fd_table[fd].flags.socket_eof) {
             debugs(50, 2, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
         } else if (ignoreErrno(errno)) {
             debugs(50, 9, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->selectOrQueueWrite();
         } else {
             debugs(50, 2, HERE << "FD " << fd << " write failure: " << 
xstrerror() << ".");
             state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
         }
     } else {
         /* A successful write, continue */
         state->offset += len;
 
+        if (state->buf2) {
+            state->buf2->consume(len);
+            if (state->buf2->isEmpty())
+                state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
+        }
+
         if (state->offset < state->size) {
             /* Not done, reinstall the write handler and write some more */
             state->selectOrQueueWrite();
         } else {
             state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
         }
     }
 
     PROF_stop(commHandleWrite);
 }

=== modified file 'src/comm/Write.h'
--- src/comm/Write.h    2012-08-14 11:53:07 +0000
+++ src/comm/Write.h    2014-05-13 08:56:43 +0000
@@ -1,34 +1,44 @@
 #ifndef _SQUID_COMM_IOWRITE_H
 #define _SQUID_COMM_IOWRITE_H
 
 #include "base/AsyncCall.h"
 #include "comm/forward.h"
 #include "typedefs.h"
 
 class MemBuf;
+class SBuf;
+
 namespace Comm
 {
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  *
  * free_func is used to free the passed buffer when the write has completed.
  */
 void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, 
AsyncCall::Pointer &callback, FREE *free_func);
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  */
 void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer 
&callback);
 
+/**
+ * Queue a write for an SBuf contents. The SBuf content is consume()'d as it 
is written.
+ * callback is scheduled when the SBuf is emptied, on error, or on file 
descriptor close.
+ * If the SBuf is appended to while write(2) are ongoing some additional bytes 
may also be
+ * written before the callback is scheduled.
+ */
+void Write(const Comm::ConnectionPointer &conn, SBuf *sb, AsyncCall::Pointer 
&callback);
+
 /// Cancel the write pending on FD. No action if none pending.
 void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
 
 // callback handler to process an FD which is available for writing.
 extern PF HandleWrite;
 
 } // namespace Comm
 
 #endif /* _SQUID_COMM_IOWRITE_H */

=== modified file 'src/neighbors.cc'
--- src/neighbors.cc    2014-04-30 10:50:09 +0000
+++ src/neighbors.cc    2014-05-12 12:55:36 +0000
@@ -151,50 +151,50 @@
 
     return p->type;
 }
 
 /**
  * \return Whether it is appropriate to fetch REQUEST from PEER.
  */
 bool
 peerAllowedToUse(const CachePeer * p, HttpRequest * request)
 {
 
     const CachePeerDomainList *d = NULL;
     assert(request != NULL);
 
     if (neighborType(p, request) == PEER_SIBLING) {
 #if PEER_MULTICAST_SIBLINGS
         if (p->type == PEER_MULTICAST && p->options.mcast_siblings &&
                 (request->flags.noCache || request->flags.refresh || 
request->flags.loopDetected || request->flags.needValidation))
             debugs(15, 2, "peerAllowedToUse(" << p->name << ", " << 
request->GetHost() << ") : multicast-siblings optimization match");
 #endif
-        if (request->flags.noCache)
+        if (request->flags.loopDetected)
             return false;
 
-        if (request->flags.refresh)
+        if (request->flags.noCache)
             return false;
 
-        if (request->flags.loopDetected)
+        if (request->flags.refresh && p->options.allow_miss)
             return false;
 
-        if (request->flags.needValidation)
+        if (request->flags.needValidation && p->options.allow_miss)
             return false;
     }
 
     // CONNECT requests are proxy requests. Not to be forwarded to origin 
servers.
     // Unless the destination port matches, in which case we MAY perform a 
'DIRECT' to this CachePeer.
     if (p->options.originserver && request->method == Http::METHOD_CONNECT && 
request->port != p->in_addr.port())
         return false;
 
     if (p->peer_domain == NULL && p->access == NULL)
         return true;
 
     bool do_ping = false;
     for (d = p->peer_domain; d; d = d->next) {
         if (0 == matchDomainName(request->GetHost(), d->domain)) {
             do_ping = d->do_ping;
             break;
         }
 
         do_ping = !d->do_ping;
     }

=== modified file 'src/tunnel.cc'
--- src/tunnel.cc       2014-05-07 14:40:05 +0000
+++ src/tunnel.cc       2014-05-13 08:59:13 +0000
@@ -108,69 +108,57 @@
         return (server.conn != NULL && server.conn->getPeer() ? 
server.conn->getPeer()->host : request->GetHost());
     };
 
     /// Whether we are writing a CONNECT request to a peer.
     bool waitingForConnectRequest() const { return connectReqWriting; }
     /// Whether we are reading a CONNECT response from a peer.
     bool waitingForConnectResponse() const { return connectRespBuf; }
     /// Whether we are waiting for the CONNECT request/response exchange with 
the peer.
     bool waitingForConnectExchange() const { return waitingForConnectRequest() 
|| waitingForConnectResponse(); }
 
     /// Whether the client sent a CONNECT request to us.
     bool clientExpectsConnectResponse() const {
         return !(request != NULL &&
                  (request->flags.interceptTproxy || 
request->flags.intercepted));
     }
 
     class Connection
     {
 
     public:
-        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), 
size_ptr(NULL) {}
-
-        ~Connection();
+        Connection() : size_ptr(NULL) {}
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
-        void bytesIn(int const &);
-#if USE_DELAY_POOLS
-
-        void setDelayId(DelayId const &);
-#endif
-
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
         /// handles a non-I/O error associated with this Connection
         void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
-        int len;
-        char *buf;
+        SBuf buf;
         int64_t *size_ptr;             /* pointer to size in an ConnStateData 
for logging */
 
         Comm::ConnectionPointer conn;    ///< The currently connected 
connection.
 
-    private:
 #if USE_DELAY_POOLS
-
         DelayId delayId;
 #endif
-
     };
 
     Connection client, server;
     int *status_ptr;           /* pointer to status for logging */
     MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we 
need it
     bool connectReqWriting; ///< whether we are writing a CONNECT request to a 
peer
 
     void copyRead(Connection &from, IOCB *completion);
 
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
 private:
 #if USE_OPENSSL
     /// Gives PeerConnector access to Answer in the TunnelStateData callback 
dialer.
     class MyAnswerDialer: public CallDialer, public 
Ssl::PeerConnector::CbDialer
     {
     public:
         typedef void (TunnelStateData::*Method)(Ssl::PeerConnectorAnswer &);
 
@@ -181,208 +169,196 @@
         virtual bool canDial(AsyncCall &call) { return tunnel_.valid(); }
         void dial(AsyncCall &call) { ((&(*tunnel_))->*method_)(answer_); }
         virtual void print(std::ostream &os) const {
             os << '(' << tunnel_.get() << ", " << answer_ << ')';
         }
 
         /* Ssl::PeerConnector::CbDialer API */
         virtual Ssl::PeerConnectorAnswer &answer() { return answer_; }
 
     private:
         Method method_;
         CbcPointer<TunnelStateData> tunnel_;
         Ssl::PeerConnectorAnswer answer_;
     };
 
     void connectedToPeer(Ssl::PeerConnectorAnswer &answer);
 #endif
 
     CBDATA_CLASS2(TunnelStateData);
     bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, 
Connection &from, Connection &to);
-    void copy(size_t len, Connection &from, Connection &to, IOCB *);
+    void copy(Connection &from, Connection &to, IOCB *);
     void handleConnectResponse(const size_t chunkSize);
-    void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
-    void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+    void readServer(size_t len, comm_err_t errcode, int xerrno);
+    void readClient(size_t len, comm_err_t errcode, int xerrno);
+    void writeClientDone(size_t len, comm_err_t flag, int xerrno);
+    void writeServerDone(size_t len, comm_err_t flag, int xerrno);
 
     static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char 
*buf, size_t len, comm_err_t errcode, int xerrno, void *data);
     void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, 
int xerrno);
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection 
established\r\n\r\n";
 
 static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, 
void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
-    if (!tunnelState->server.len) {
+    if (tunnelState->server.buf.isEmpty()) {
         tunnelState->client.conn->close();
         return;
     }
 }
 
 static void
 tunnelClientClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->client.conn);
     tunnelState->client.conn = NULL;
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
-    if (!tunnelState->client.len) {
+    if (tunnelState->client.buf.isEmpty()) {
         tunnelState->server.conn->close();
         return;
     }
 }
 
 TunnelStateData::TunnelStateData() :
         url(NULL),
         http(),
         request(NULL),
         status_ptr(NULL),
         connectRespBuf(NULL),
         connectReqWriting(false)
 {
     debugs(26, 3, "TunnelStateData constructed this=" << this);
 }
 
 TunnelStateData::~TunnelStateData()
 {
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
     serverDestinations.clear();
     delete connectRespBuf;
 }
 
-TunnelStateData::Connection::~Connection()
-{
-    safe_free(buf);
-}
-
 int
 TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
 {
 #if USE_DELAY_POOLS
     return delayId.bytesWanted(lowerbound, upperbound);
 #else
 
     return upperbound;
 #endif
 }
 
-void
-TunnelStateData::Connection::bytesIn(int const &count)
-{
-    debugs(26, 3, HERE << "len=" << len << " + count=" << count);
-#if USE_DELAY_POOLS
-    delayId.bytesIn(count);
-#endif
-
-    len += count;
-}
-
 int
 TunnelStateData::Connection::debugLevelForError(int const xerrno) const
 {
 #ifdef ECONNRESET
 
     if (xerrno == ECONNRESET)
         return 2;
 
 #endif
 
     if (ignoreErrno(xerrno))
         return 3;
 
     return 1;
 }
 
 /* Read from server side and queue it for writing to the client */
 void
 TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, 
size_t len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert(cbdataReferenceValid(tunnelState));
     debugs(26, 3, HERE << c);
 
-    tunnelState->readServer(buf, len, errcode, xerrno);
+    tunnelState->readServer(len, errcode, xerrno);
 }
 
 void
-TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int 
xerrno)
+TunnelStateData::readServer(size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << 
errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
-        server.bytesIn(len);
+#if USE_DELAY_POOLS
+        server.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
-        copy(len, server, client, WriteClientDone);
+        copy(server, client, WriteClientDone);
 }
 
 /// Called when we read [a part of] CONNECT response from the peer
 void
 TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t 
errcode, int xerrno)
 {
     debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << 
errcode);
     assert(waitingForConnectResponse());
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
         connectRespBuf->appended(len);
-        server.bytesIn(len);
+#if USE_DELAY_POOLS
+        server.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
         handleConnectResponse(len);
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char 
*buf, size_t len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
 }
 
 /// Parses [possibly incomplete] CONNECT response and reacts to it.
 /// If the tunnel is being closed or more response data is needed, returns 
false.
@@ -417,169 +393,167 @@
             server.logicError("huge CONNECT response from peer");
             return;
         }
 
         // keep reading
         readConnectResponse();
         return;
     }
 
     // CONNECT response was successfully parsed
     *status_ptr = rep.sline.status();
 
     // bail if we did not get an HTTP 200 (Connection Established) response
     if (rep.sline.status() != Http::scOkay) {
         server.logicError("unsupported CONNECT response status code");
         return;
     }
 
     if (rep.hdr_sz < connectRespBuf->contentSize()) {
         // preserve bytes that the server already sent after the CONNECT 
response
-        server.len = connectRespBuf->contentSize() - rep.hdr_sz;
-        memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
-    } else {
-        // reset; delay pools were using this field to throttle CONNECT 
response
-        server.len = 0;
+        server.buf.append(connectRespBuf->content()+rep.hdr_sz, 
connectRespBuf->contentSize() - rep.hdr_sz);
     }
 
     delete connectRespBuf;
     connectRespBuf = NULL;
     connectExchangeCheckpoint();
 }
 
 void
 TunnelStateData::Connection::logicError(const char *errMsg)
 {
     debugs(50, 3, conn << " closing on error: " << errMsg);
     conn->close();
 }
 
 void
 TunnelStateData::Connection::error(int const xerrno)
 {
     /* XXX fixme xstrerror and xerrno... */
     errno = xerrno;
 
     debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write 
failure: " << xstrerror());
 
     if (!ignoreErrno(xerrno))
         conn->close();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t 
len, comm_err_t errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->readClient(buf, len, errcode, xerrno);
+    tunnelState->readClient(len, errcode, xerrno);
 }
 
 void
-TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int 
xerrno)
+TunnelStateData::readClient(size_t len, comm_err_t errcode, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << 
errcode);
 
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == COMM_ERR_CLOSING)
         return;
 
     if (len > 0) {
-        client.bytesIn(len);
+#if USE_DELAY_POOLS
+        client.delayId.bytesIn(len);
+#endif
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, client, server))
-        copy(len, client, server, WriteServerDone);
+        copy(client, server, WriteServerDone);
 }
 
 /// Updates state after reading from client or server.
 /// Returns whether the caller should use the data just read.
 bool
 TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int 
xerrno, Connection &from, Connection &to)
 {
     debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << 
"}");
 
     /* I think this is to prevent free-while-in-a-callback behaviour
      * - RBC 20030229
      * from.conn->close() / to.conn->close() done here trigger close callbacks 
which may free TunnelStateData
      */
     const CbcPointer<TunnelStateData> safetyLock(this);
 
     /* Bump the source connection read timeout on any activity */
     if (Comm::IsConnOpen(from.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, 
this));
         commSetConnTimeout(from.conn, Config.Timeout.read, timeoutCall);
     }
 
     /* Bump the dest connection read timeout on any activity */
     /* see Bug 3659: tunnels can be weird, with very long one-way transfers */
     if (Comm::IsConnOpen(to.conn)) {
         AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                          CommTimeoutCbPtrFun(tunnelTimeout, 
this));
         commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall);
     }
 
     if (errcode)
         from.error (xerrno);
     else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
         debugs(26, 3, HERE << "Nothing to write or client gone. Terminate the 
tunnel.");
         from.conn->close();
 
         /* Only close the remote end if we've finished queueing data to it */
-        if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
+        if (from.buf.isEmpty() && Comm::IsConnOpen(to.conn)) {
             to.conn->close();
         }
     } else if (cbdataReferenceValid(this)) {
         return true;
     }
 
     return false;
 }
 
 void
-TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB 
*completion)
+TunnelStateData::copy(Connection &from, Connection &to, IOCB *completion)
 {
     debugs(26, 3, HERE << "Schedule Write");
     AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
                                          CommIoCbPtrFun(completion, this));
-    Comm::Write(to.conn, from.buf, len, call, NULL);
+    Comm::Write(to.conn, &from.buf, call);
 }
 
 /* Writes data from the client buffer to the server side */
 void
 TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->writeServerDone(buf, len, flag, xerrno);
+    tunnelState->writeServerDone(len, flag, xerrno);
 }
 
 void
-TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int 
xerrno)
+TunnelStateData::writeServerDone(size_t len, comm_err_t flag, int xerrno)
 {
     debugs(26, 3, HERE  << server.conn << ", " << len << " bytes written, 
flag=" << flag);
 
     /* Error? */
     if (flag != COMM_OK) {
         if (flag != COMM_ERR_CLOSING) {
             debugs(26, 4, HERE << "calling TunnelStateData::server.error(" << 
xerrno <<")");
             server.error(xerrno); // may call comm_close
         }
         return;
     }
 
     /* EOF? */
     if (len == 0) {
         debugs(26, 4, HERE << "No read input. Closing server connection.");
         server.conn->close();
         return;
     }
 
     /* Valid data */
@@ -590,57 +564,55 @@
     /* If the other end has closed, so should we */
     if (!Comm::IsConnOpen(client.conn)) {
         debugs(26, 4, HERE << "Client gone away. Shutting down server 
connection.");
         server.conn->close();
         return;
     }
 
     const CbcPointer<TunnelStateData> safetyLock(this);        /* ??? should 
be locked by the caller... */
 
     if (cbdataReferenceValid(this))
         copyRead(client, ReadClient);
 }
 
 /* Writes data from the server buffer to the client side */
 void
 TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, 
size_t len, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
-    tunnelState->writeClientDone(buf, len, flag, xerrno);
+    tunnelState->writeClientDone(len, flag, xerrno);
 }
 
 void
 TunnelStateData::Connection::dataSent(size_t amount)
 {
-    debugs(26, 3, HERE << "len=" << len << " - amount=" << amount);
-    assert(amount == (size_t)len);
-    len =0;
-    /* increment total object size */
+    debugs(26, 3, amount << " bytes written");
 
+    /* increment total object size */
     if (size_ptr)
         *size_ptr += amount;
 }
 
 void
-TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int 
xerrno)
+TunnelStateData::writeClientDone(size_t len, comm_err_t flag, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", " << len << " bytes written, 
flag=" << flag);
 
     /* Error? */
     if (flag != COMM_OK) {
         if (flag != COMM_ERR_CLOSING) {
             debugs(26, 4, HERE << "Closing client connection due to comm 
flags.");
             client.error(xerrno); // may call comm_close
         }
         return;
     }
 
     /* EOF? */
     if (len == 0) {
         debugs(26, 4, HERE << "Closing client connection due to 0 byte read.");
         client.conn->close();
         return;
     }
 
     /* Valid data */
@@ -665,86 +637,86 @@
 {
     TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data);
     debugs(26, 3, HERE << io.conn);
     /* Temporary lock to protect our own feets (comm_close -> 
tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     tunnelState->client.closeIfOpen();
     tunnelState->server.closeIfOpen();
 }
 
 void
 TunnelStateData::Connection::closeIfOpen()
 {
     if (Comm::IsConnOpen(conn))
         conn->close();
 }
 
 void
 TunnelStateData::copyRead(Connection &from, IOCB *completion)
 {
-    assert(from.len == 0);
     AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
                                          CommIoCbPtrFun(completion, this));
-    comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), 
call);
+    from.buf.reserveSpace(from.bytesWanted(1, SQUID_TCP_SO_RCVBUF));
+    comm_read(from.conn, from.buf, call);
 }
 
 void
 TunnelStateData::readConnectResponse()
 {
     assert(waitingForConnectResponse());
 
     AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
                                          
CommIoCbPtrFun(ReadConnectResponseDone, this));
     comm_read(server.conn, connectRespBuf->space(),
               server.bytesWanted(1, connectRespBuf->spaceSize()), call);
 }
 
 /**
  * Set the HTTP status for this request and sets the read handlers for client
  * and server side connections.
  */
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
     assert(!tunnelState->waitingForConnectExchange());
     *tunnelState->status_ptr = Http::scOkay;
     if (cbdataReferenceValid(tunnelState)) {
 
         // Shovel any payload already pushed into reply buffer by the server 
response
-        if (!tunnelState->server.len)
+        if (tunnelState->server.buf.isEmpty())
             tunnelState->copyRead(tunnelState->server, 
TunnelStateData::ReadServer);
         else {
-            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << Raw("", 
tunnelState->server.buf, tunnelState->server.len) << "\n----------");
-            tunnelState->copy(tunnelState->server.len, tunnelState->server, 
tunnelState->client, TunnelStateData::WriteClientDone);
+            debugs(26, DBG_DATA, "Tunnel server PUSH Payload: \n" << 
tunnelState->server.buf << "\n----------");
+            tunnelState->copy(tunnelState->server, tunnelState->client, 
TunnelStateData::WriteClientDone);
         }
 
         // Bug 3371: shovel any payload already pushed into ConnStateData by 
the client request
         if (tunnelState->http.valid() && tunnelState->http->getConn() && 
!tunnelState->http->getConn()->in.buf.isEmpty()) {
             struct ConnStateData::In *in = &tunnelState->http->getConn()->in;
             debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf 
<< "\n----------");
 
             // We just need to ensure the bytes from ConnStateData are in 
client.buf already to deliver
-            memcpy(tunnelState->client.buf, in->buf.rawContent(), 
in->buf.length());
-            // NP: readClient() takes care of buffer length accounting.
-            tunnelState->readClient(tunnelState->client.buf, in->buf.length(), 
COMM_OK, 0);
+            tunnelState->client.buf = in->buf;
             in->buf.consume(); // ConnStateData buffer accounting after the 
shuffle.
+            // NP: readClient() takes care of buffer length accounting.
+            tunnelState->readClient(tunnelState->client.buf.length(), COMM_OK, 
0);
         } else
             tunnelState->copyRead(tunnelState->client, 
TunnelStateData::ReadClient);
     }
 }
 
 /**
  * All the pieces we need to write to client and/or server connection
  * have been written.
  * Call the tunnelStartShoveling to start the blind pump.
  */
 static void
 tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, 
size_t size, comm_err_t flag, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << conn << ", flag=" << flag);
 
     if (flag != COMM_OK) {
         *tunnelState->status_ptr = Http::scInternalServerError;
         tunnelErrorComplete(conn->fd, data, 0);
         return;
@@ -838,41 +810,41 @@
             Comm::ConnOpener *cs = new 
Comm::ConnOpener(tunnelState->serverDestinations[0], call, 
Config.Timeout.connect);
             cs->setHost(tunnelState->url);
             AsyncJob::Start(cs);
         } else {
             debugs(26, 4, HERE << "terminate with error.");
             ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, 
Http::scServiceUnavailable, tunnelState->request.getRaw());
             *tunnelState->status_ptr = Http::scServiceUnavailable;
             err->xerrno = xerrno;
             // on timeout is this still:    err->xerrno = ETIMEDOUT;
             err->port = conn->remote.port();
             err->callback = tunnelErrorComplete;
             err->callback_data = tunnelState;
             errorSend(tunnelState->client.conn, err);
         }
         return;
     }
 
 #if USE_DELAY_POOLS
     /* no point using the delayIsNoDelay stuff since tunnel is nice and simple 
*/
     if (conn->getPeer() && conn->getPeer()->options.no_delay)
-        tunnelState->server.setDelayId(DelayId());
+        tunnelState->server.delayId = DelayId();
 #endif
 
     tunnelState->request->hier.note(conn, tunnelState->getHost());
 
     tunnelState->server.conn = conn;
     tunnelState->request->peer_host = conn->getPeer() ? conn->getPeer()->host 
: NULL;
     comm_add_close_handler(conn->fd, tunnelServerClosed, tunnelState);
 
     debugs(26, 4, HERE << "determine post-connect handling pathway.");
     if (conn->getPeer()) {
         tunnelState->request->peer_login = conn->getPeer()->login;
         tunnelState->request->flags.proxying = 
!(conn->getPeer()->options.originserver);
     } else {
         tunnelState->request->peer_login = NULL;
         tunnelState->request->flags.proxying = false;
     }
 
     if (tunnelState->request->flags.proxying)
         tunnelState->connectToPeer();
     else {
@@ -906,41 +878,41 @@
          * default is to allow.
          */
         ACLFilledChecklist ch(Config.accessList.miss, request, NULL);
         ch.src_addr = request->client_addr;
         ch.my_addr = request->my_addr;
         if (ch.fastCheck() == ACCESS_DENIED) {
             debugs(26, 4, HERE << "MISS access forbidden.");
             err = new ErrorState(ERR_FORWARDING_DENIED, Http::scForbidden, 
request);
             *status_ptr = Http::scForbidden;
             errorSend(http->getConn()->clientConnection, err);
             return;
         }
     }
 
     debugs(26, 3, request->method << ' ' << url << ' ' << request->http_ver);
     ++statCounter.server.all.requests;
     ++statCounter.server.other.requests;
 
     tunnelState = new TunnelStateData;
 #if USE_DELAY_POOLS
-    tunnelState->server.setDelayId(DelayId::DelayClient(http));
+    tunnelState->server.delayId = DelayId::DelayClient(http);
 #endif
     tunnelState->url = xstrdup(url);
     tunnelState->request = request;
     tunnelState->server.size_ptr = size_ptr;
     tunnelState->status_ptr = status_ptr;
     tunnelState->client.conn = http->getConn()->clientConnection;
     tunnelState->http = http;
     tunnelState->al = al;
 
     comm_add_close_handler(tunnelState->client.conn->fd,
                            tunnelClientClosed,
                            tunnelState);
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, 
tunnelState));
     commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, 
timeoutCall);
 
     peerSelect(&(tunnelState->serverDestinations), request, al,
                NULL,
                tunnelPeerSelectComplete,
@@ -1063,29 +1035,20 @@
     delete err;
 
     GetMarkingsToServer(tunnelState->request.getRaw(), 
*tunnelState->serverDestinations[0]);
 
     debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << 
(*peer_paths)[0] << "}, serverDest[0]={" <<
            tunnelState->serverDestinations[0] << "}");
 
     AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", 
CommConnectCbPtrFun(tunnelConnectDone, tunnelState));
     Comm::ConnOpener *cs = new 
Comm::ConnOpener(tunnelState->serverDestinations[0], call, 
Config.Timeout.connect);
     cs->setHost(tunnelState->url);
     AsyncJob::Start(cs);
 }
 
 CBDATA_CLASS_INIT(TunnelStateData);
 
 bool
 TunnelStateData::noConnections() const
 {
     return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn);
 }
-
-#if USE_DELAY_POOLS
-void
-TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
-{
-    delayId = newDelay;
-}
-
-#endif

Reply via email to