This patch abstracts the TCP socket operations out of existing
client-side code into a class which can be re-used for any TCP socket code.

It provides:

* data members for referencing a TCP socket and read buffer.

* methods for incremental parsing or processing of recieved data.

* methods for managing half and fully closing a socket.

* separation of socket read() from HTTP/1 parsing and processing logic.
 It may be re-used for any client-side or server-side TCP socket.


This patch includes only the Comm::TcpReceiver class and integration
with src/comm/. The integration and replacement of client-side code is
contained in the larger followup part-2 patch.

NOTE: the old delay-aware and half-closed monitoring logics are left out
of this class as they are currently very specific to the
client/server-side logics.

Amos
=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am        2012-03-29 09:22:41 +0000
+++ src/comm/Makefile.am        2013-10-16 12:14:31 +0000
@@ -23,6 +23,8 @@
        ModSelectWin32.cc \
        TcpAcceptor.cc \
        TcpAcceptor.h \
+       TcpReceiver.cc \
+       TcpReceiver.h \
        UdpOpenDialer.h \
        Write.cc \
        Write.h \

=== added file 'src/comm/TcpReceiver.cc'
--- src/comm/TcpReceiver.cc     1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.cc     2013-10-25 15:55:23 +0000
@@ -0,0 +1,246 @@
+/*
+ * DEBUG: section 05    TCP Read
+ *
+ * - level 2 minor TCP errors
+ * - level 3 duplicate reasons for halting I/O (bugs? only need to halt once)
+ * - level 4 reasons for errors and halting I/O
+ * - level 5 common I/O and buffer activity
+ */
+#include "squid.h"
+#include "comm.h"
+#include "comm/TcpReceiver.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "StatCounters.h"
+#include "tools.h"
+
+Comm::TcpReceiver::TcpReceiver(const Comm::ConnectionPointer &c) :
+        AsyncJob("Comm::TcpReceiver"),
+        tcp(c),
+        stoppedReceiving_(NULL),
+        stoppedSending_(NULL),
+        closed_(),
+        reader_()
+{}
+
+void
+Comm::TcpReceiver::tcpConnectionInit()
+{
+    /* Ideally this would be setup by the constructor but it involves
+     * calls to toCbdata() virtual function implemented by the derived class
+     * so must be run explicitly by that class's constructor.
+     */
+
+    typedef CommCbMemFunT<Comm::TcpReceiver, CommCloseCbParams> Dialer;
+    closed_ = JobCallback(33, 5, Dialer, this, 
Comm::TcpReceiver::tcpConnectionClosed);
+    comm_add_close_handler(tcp->fd, closed_);
+}
+
+bool
+Comm::TcpReceiver::doneAll() const
+{
+    return stoppedSending() && stoppedReceiving() && !inBuf.hasContent() && 
AsyncJob::doneAll();
+}
+
+void
+Comm::TcpReceiver::swanSong()
+{
+    if (closed_ != NULL)
+        closed_->cancel("Comm::TcpReceiver::swanSong");
+
+    if (Comm::IsConnOpen(tcp))
+        tcp->close();
+}
+
+void
+Comm::TcpReceiver::stopReading()
+{
+    /* NP: This is a hack only needed to allow TunnelStateData
+     * to take control of a socket despite any scheduled read.
+     */
+    if (reading()) {
+        comm_read_cancel(tcp->fd, reader_);
+        reader_ = NULL;
+    }
+}
+
+void
+Comm::TcpReceiver::stopReceiving(const char *error)
+{
+    debugs(5, 4, "receiving error (" << tcp << "): " << error <<
+           "; old sending error: " << (stoppedSending() ? stoppedSending_ : 
"none"));
+
+    if (const char *oldError = stoppedReceiving()) {
+        debugs(5, 3, "already stopped receiving: " << oldError);
+        return; // nothing has changed as far as this connection is concerned
+    }
+
+    stoppedReceiving_ = error;
+
+    if (const char *sendError = stoppedSending()) {
+        debugs(5, 3, "closing because also stopped sending: " << sendError);
+        closed_->cancel("graceful close");
+        tcp->close();
+    }
+}
+
+void
+Comm::TcpReceiver::stopSending(const char *error)
+{
+    debugs(5, 4, "sending error (" << tcp << "): " << error <<
+           "; old receiving error: " <<
+           (stoppedReceiving() ? stoppedReceiving_ : "none"));
+
+    if (const char *oldError = stoppedSending()) {
+        debugs(5, 3, "already stopped sending: " << oldError);
+        return; // nothing has changed as far as this connection is concerned
+    }
+    stoppedSending_ = error;
+
+    if (!stoppedReceiving()) {
+        if (const int64_t expecting = mayNeedToReadMore()) {
+            debugs(5, 5, "must still read " << expecting <<
+                   " bytes with " << inBuf.contentSize() << " unused");
+            return; // wait for the request receiver to finish reading
+        }
+    }
+    closed_->cancel("graceful close");
+    tcp->close();
+}
+
+bool
+Comm::TcpReceiver::maybeMakeSpaceAvailable()
+{
+    if (inBuf.spaceSize() < 2) {
+        if (!inBuf.hasPotentialSpace()) {
+            debugs(5, 5, "buffer full: " << inBuf.contentSize() << " of " << 
(inBuf.max_capacity-1) << " bytes");
+            return false;
+        }
+        (void)inBuf.space(inBuf.contentSize()*2);
+        debugs(5, 5, "growing buffer: content-size=" << inBuf.contentSize() << 
" capacity=" << inBuf.capacity);
+    }
+    return true;
+}
+
+void
+Comm::TcpReceiver::readSomeData()
+{
+    // one read() at a time
+    if (reading())
+        return;
+
+    // useless to read() after aborting read()
+    if (stoppedReceiving())
+        return;
+
+    // useless to try when there is no buffer space available
+    if (!maybeMakeSpaceAvailable())
+        return;
+
+    debugs(5, 5, tcp << ": reading... buffer space " << inBuf.spaceSize() << " 
bytes.");
+
+    typedef CommCbMemFunT<Comm::TcpReceiver, CommIoCbParams> Dialer;
+    reader_ = JobCallback(33, 5, Dialer, this, 
Comm::TcpReceiver::readIoHandler);
+    comm_read(tcp, inBuf.space(), inBuf.spaceSize(), reader_);
+}
+
+/// identifies whether the read() event was due to a network error happening
+bool
+Comm::TcpReceiver::readWasError(comm_err_t flag, int size, int xerrno) const
+{
+    if (flag != COMM_OK) {
+        debugs(5, 2, tcp << ": got flag " << flag);
+        return true;
+    }
+
+    if (size < 0) {
+        if (!ignoreErrno(xerrno)) {
+            debugs(5, 2, tcp << ": " << xstrerr(xerrno));
+            return true;
+        } else if (!inBuf.hasContent()) {
+            debugs(5, 2, tcp << ": no data to process (" << xstrerr(xerrno) << 
")");
+        }
+    }
+
+    return false;
+}
+
+void
+Comm::TcpReceiver::readIoHandler(const CommIoCbParams &io)
+{
+    debugs(5, 5, io.conn << " size " << io.size);
+    Must(reading());
+    reader_ = NULL;
+
+    /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
+    if (io.flag == COMM_ERR_CLOSING) {
+        debugs(5, 5, io.conn << " closing Bailout.");
+        return;
+    }
+
+    assert(Comm::IsConnOpen(tcp));
+    assert(io.conn->fd == tcp->fd);
+
+    /*
+     * Don't reset the timeout value here.  The timeout value will be
+     * set to Config.Timeout.request by httpAccept() and
+     * clientWriteComplete(), and should apply to the request as a
+     * whole, not individual read() calls.  Plus, it breaks our
+     * lame half-close detection
+     */
+    if (readWasError(io.flag, io.size, io.xerrno)) {
+        noteTcpReadError(io.xerrno);
+        io.conn->close();
+        return;
+    }
+
+    if (io.flag == COMM_OK) {
+        if (io.size > 0) {
+            kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+            inBuf.append(io.buf, io.size);
+
+        } else if (io.size == 0) {
+            debugs(5, 5, io.conn << " closed?");
+            stopReceiving("client zero sized read");
+
+            // if already stopped sending, the above will close the connection
+            if (stoppedSending())
+                return;
+
+            // if the connection is still possibly sending
+            // the child class may be able to stop immediately
+            if (const char *reason = maybeFinishedWithTcp()) {
+                stopSending(reason); // will close connection
+                return;
+            }
+
+            /* It might be half-closed, we can't tell */
+            fd_table[io.conn->fd].flags.socket_eof = true;
+            commMarkHalfClosed(io.conn->fd);
+            fd_note(io.conn->fd, "half-closed");
+        }
+    }
+
+    bool mayReadMore = true;
+    // pass handling on to child instance code
+    if (inBuf.hasContent())
+        mayReadMore = processReadBuffer(inBuf);
+
+    if (!maybeMakeSpaceAvailable()) {
+        stopReceiving("full read buffer - but processing does not free any 
space");
+        mayReadMore = false;
+    }
+
+    // schedule another read() - unless aborted by processing actions
+    if (mayReadMore)
+        readSomeData();
+}
+
+/* This is a handler normally called by comm_close() */
+void
+Comm::TcpReceiver::tcpConnectionClosed(const CommCloseCbParams &io)
+{
+    stopReceiving("TCP connection closed");
+    stopSending("TCP connection closed");
+}

=== added file 'src/comm/TcpReceiver.h'
--- src/comm/TcpReceiver.h      1970-01-01 00:00:00 +0000
+++ src/comm/TcpReceiver.h      2013-10-25 15:29:14 +0000
@@ -0,0 +1,105 @@
+#ifndef SQUID_SRC_COMM_TCPRECEIVER_H
+#define SQUID_SRC_COMM_TCPRECEIVER_H
+
+#include "base/AsyncCall.h"
+#include "base/AsyncJob.h"
+#include "comm/Connection.h"
+#include "comm_err_t.h"
+#include "MemBuf.h"
+
+class CommIoCbParams;
+
+namespace Comm {
+
+class TcpReceiver : virtual public AsyncJob
+{
+public:
+    explicit TcpReceiver(const Comm::ConnectionPointer &c);
+    virtual ~TcpReceiver() {}
+
+    // AsyncJob API
+    virtual bool doneAll() const;
+    virtual void swanSong();
+
+    /// initialize the TCP connection event handlers
+    /// close callback etc.
+    void tcpConnectionInit();
+
+    /// whether a read() operation is currently underway
+    bool reading() const {return reader_!=NULL;}
+
+    /// cancels a read if one is scheduled, without blocking future socket use
+    /// \note this is a hack for TunnelStateData integration. Do not use for 
new code.
+    void stopReading();
+
+    /// note request receiving error and close as soon as we write the response
+    void stopReceiving(const char *error);
+
+    /// true if we stopped receiving the request
+    const char *stoppedReceiving() const { return stoppedReceiving_; }
+
+    /// note response sending error and close as soon as we read the request
+    void stopSending(const char *error);
+
+    /// true if we stopped sending the response
+    const char *stoppedSending() const { return stoppedSending_; }
+
+    /// called when sending has stopped to check if more reads may be required.
+    virtual int64_t mayNeedToReadMore() const = 0;
+
+    /// called when buffer may be used to receive new network data
+    bool maybeMakeSpaceAvailable();
+
+    /** called when there is new buffered data to process
+     * If the processing requires further read() to be halted temporarily it
+     * may return false. The processor is then responsible for ensuring that
+     * readSomeData() is called when read() are to be resumed.
+     *
+     *  \retval true  if additional read() are acceptible.
+     *  \retval false if read() are to be halted.
+     */
+    virtual bool processReadBuffer(MemBuf &) = 0;
+
+    /// called when there is an I/O error reading
+    virtual void noteTcpReadError(int) {}
+
+    /// Attempt to read some data.
+    /// Will call processReadBuffer() when there is data to process.
+    void readSomeData();
+
+    /// callback to handle TCP read() input
+    void readIoHandler(const CommIoCbParams &io);
+
+    /**
+     * called when TCP 0-size read occurs to ask the child class
+     * whether it is able to stop sending yet.
+     *
+     * \return a reason for stopping I/O,
+     *         or NULL to continue I/O with client half-closed.
+     */
+    virtual const char * maybeFinishedWithTcp() = 0;
+
+    Comm::ConnectionPointer tcp;
+
+    MemBuf inBuf;
+
+private:
+    bool readWasError(comm_err_t flag, int size, int xerrno) const;
+    void tcpConnectionClosed(const CommCloseCbParams &io);
+
+    /// the reason why we no longer read the request or nil
+    const char *stoppedReceiving_;
+
+    /// the reason why we no longer write the response or nil
+    const char *stoppedSending_;
+
+    /// callback to stop traffic processing when FD closes
+    AsyncCall::Pointer closed_;
+
+    ///< set when we are reading
+    AsyncCall::Pointer reader_;
+};
+
+} // namespace Comm
+
+#endif /* SQUID_SRC_COMM_TCPRECEIVER_H */

Reply via email to