BBlack has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/384167 )

Change subject: Refactor/improve purging code
......................................................................

Refactor/improve purging code

This refactor removes the PST_NOTCONN_WAIT state from the state
machine.  The code no longer has such a state as it always
reconnects immediately, even if there is no currently-pending
output.

Much of the rest of the logic was de-duplicated into a few new
helper functions, and some of the internal state magic was made a
bit more idempotent.

Change-Id: Icd5e96a1bae4dac65c81f80b07f99e9388608fe0
---
M src/main.c
M src/purger.c
M src/purger.h
M src/receiver.c
4 files changed, 172 insertions(+), 265 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/software/varnish/vhtcpd 
refs/changes/67/384167/1

diff --git a/src/main.c b/src/main.c
index 23896f7..aa338a2 100644
--- a/src/main.c
+++ b/src/main.c
@@ -34,7 +34,7 @@
 /* global libev priorities:
  *  2) receiver input
  *  1) purger i/o
- *  0) purger idle timer
+ *  0) purger timeouts/timers
  * -1) strq excess space reclamation
  * -2) stats/monitor stuff...
  */
diff --git a/src/purger.c b/src/purger.c
index 80e6ff8..e0e5713 100644
--- a/src/purger.c
+++ b/src/purger.c
@@ -38,9 +38,6 @@
 #include "http-parser/http_parser.h"
 
 // XXX some of the below could be configurable as well
-// XXX note that while this is set up to try to take advantage
-//   of keep-alive, apparently varnish closes after every
-//   response to a PURGE, currently.
 
 // this buffer holds the complete HTTP response we get, and can grow at runtime
 #define INBUF_INITSIZE 4096U
@@ -59,7 +56,7 @@
 #define PERSIST_REQS 100000U
 #define PERSIST_TIME 900.0
 
-// These are the 6 possible states of the purger object.
+// These are the 5 possible states of the purger object.
 // Note in the state transition code that we often *could* skip
 //   straight through multiple states without returning to libev
 //   (at least try and see if the next call doesn't EAGAIN),
@@ -71,62 +68,46 @@
 //   but we still fully update the ev watcher states during these
 //   to keep the code simpler to follow, even when it's likely
 //   to be pointless churn.
+// Note that all states use the timeout watcher, so we use it
+// ev_timer_again() mode for efficiency (method 2 in the libev docs)
 
 typedef enum {
-    // In this state, no message is pending in outbuf or the queue,
-    //   and we have no active connection and no active libev watchers.
-    // The only way out of NOTCONN_IDLE is a new purger_enqueue() call
-    //   from the receiver code.
-    PST_NOTCONN_IDLE = 0,
-
-    // In this state, there's a message pending in outbuf, and there
-    //   may or may not be more in the queue, and we're in the process
-    //   of trying to establish the outbound connection (waiting on
-    //   nonblock connect() success).
+    // In this state we're in the process of trying to establish the
+    //   outbound connection (waiting on nonblock connect() success).
+    // Possible next states: PST_CONN_IDLE, PST_NOTCONN_WAIT
+    // Active watchers: write, timeout
     PST_CONNECTING,
 
-    // This is an exception state that occurs when the connect()
-    //   attempt above fails.  We wait a short timeout before moving
-    //   back to CONNECTING and trying again.  Note that in other
-    //   connection failure cases (during read/write), we immediately
-    //   retry the connection first, and don't wait until that
-    //   connect() attempt fails.
+    // While disconnects that occur from other states (send, recv, idle)
+    //   result in an immediate reconnect attempt (connecting, above), a
+    //   failure during connecting itself results in exponential backoff
+    //   delays between reconnection attempts, and they wait out their
+    //   timers in this state.
+    // Possible next states: PST_CONNECTING
+    // Active watchers: timeout
     PST_NOTCONN_WAIT,
 
-    // In these two states, outbuf has a complete message pending,
-    //   and we have a live connection to use.
-    // In the SENDWAIT state we've written less than all bytes.
-    // In the RECVWAIT state we've written all bytes and haven't
-    //   yet read a complete response.
-    // If we eventually succeed in both sending the complete message
-    //   and receiving an acceptable response code, the transaction
-    //   will finish and outbuf will be cleared.  Whether we immediately
-    //   shift back to SENDWAIT or CONN_IDLE depends on the queue.
-    // Various failure scenarios lead to other outcomes.  Some bad
-    //   status returns from the server should lead to dropping the
-    //   current outbuf and moving on (possible bad URL).  Some
-    //   should result in terminating the connection but trying
-    //   the same URL again on the next connection.
+    // SENDWAIT means we're waiting to send request bytes to the purger.
+    // Possible next states: PST_RECVWAIT, PST_CONN_IDLE, PST_CONNECTING
+    // Active watchers: read, write, timeout
     PST_SENDWAIT,
+
+    // Possible next states: PST_SENDWAIT, PST_CONN_IDLE, PST_CONNECTING
+    // Active watchers: read, timeout
     PST_RECVWAIT,
 
     // In this state, outbuf is empty, the queue is empty, but we still
     //   have a live HTTP connection to the server from a previous
     //   message,  and we're ready to send another message if one arrives
     //   via the queue.
-    // If the server disconnects us or we hit our own idle timeout
-    //   and disconnect from it, we'll move back to _NOTCONN and
-    //   wait there until another message needs to be sent.  If
-    //   a new URL comes in via purger_enqueue() before that, we'll
-    //   move straight back to _SENDWAIT (and then possibly right
-    //   back through to _RECVWAIT as well).
+    // Possible next states: PST_SENDWAIT, PST_CONNECTING
+    // Active watchers: read, timeout
     PST_CONN_IDLE,
 } purger_state_t;
 
 // for debug logging
 #ifndef NDEBUG
 static const char* state_strs[] = {
-    "NOTCONN_IDLE",
     "CONNECTING",
     "NOTCONN_WAIT",
     "SENDWAIT",
@@ -213,32 +194,17 @@
     dmn_assert(s->write_watcher);
     dmn_assert(s->read_watcher);
     dmn_assert(s->timeout_watcher);
+    dmn_assert(ev_is_active(s->timeout_watcher));
 
     switch(s->state) {
-        case PST_NOTCONN_IDLE:
-            dmn_assert(s->fd == -1);
-            dmn_assert(!s->outbuf_bytes);
-            dmn_assert(!s->outbuf_written);
-            dmn_assert(!s->inbuf_parsed);
-            dmn_assert(!s->fd_reqs);
-            dmn_assert(!ev_is_active(s->write_watcher));
-            dmn_assert(!ev_is_active(s->read_watcher));
-            dmn_assert(!ev_is_active(s->timeout_watcher));
-            break;
         case PST_CONNECTING:
             dmn_assert(s->fd != -1);
-            dmn_assert(s->outbuf_bytes);
-            dmn_assert(!s->outbuf_written);
-            dmn_assert(!s->inbuf_parsed);
             dmn_assert(!s->fd_reqs);
             dmn_assert(ev_is_active(s->write_watcher));
             dmn_assert(!ev_is_active(s->read_watcher));
             break;
         case PST_NOTCONN_WAIT:
             dmn_assert(s->fd == -1);
-            dmn_assert(s->outbuf_bytes);
-            dmn_assert(!s->outbuf_written);
-            dmn_assert(!s->inbuf_parsed);
             dmn_assert(!s->fd_reqs);
             dmn_assert(!ev_is_active(s->write_watcher));
             dmn_assert(!ev_is_active(s->read_watcher));
@@ -273,54 +239,127 @@
 
 #endif
 
-// rv: false -> placed something in outbuf
-//     true -> queue empty, nothing placed in outbuf
-static bool dequeue_to_outbuf(purger_t* s) {
-    dmn_assert(s);
-    dmn_assert(s->outbuf);
-    dmn_assert(!s->outbuf_bytes); // no other packet currently buffered
-    dmn_assert(!s->outbuf_written); // no other packet currently buffered
-
-    unsigned req_len;
-    const char* req;
-    req = strq_dequeue(s->queue, &req_len);
-    if(req) {
-        memcpy(s->outbuf, req, req_len);
-        s->outbuf_bytes = req_len;
-        return false;
+static void close_socket(purger_t* s) {
+    ev_io_stop(s->loop, s->write_watcher);
+    ev_io_stop(s->loop, s->read_watcher);
+    if(s->fd != -1) {
+        shutdown(s->fd, SHUT_RDWR);
+        close(s->fd);
     }
-
-    return true;
-}
-
-// Common socket-close code
-static void purger_closefd(purger_t* s) {
-    shutdown(s->fd, SHUT_RDWR);
-    close(s->fd);
     s->fd = -1;
     s->fd_expire = 0.;
     s->fd_reqs = 0;
 }
 
-static bool purger_check_fd_limits(purger_t* s) {
-    return (s->fd_reqs > PERSIST_REQS)
-        || (s->fd_expire <= ev_now(s->loop));
+static void purger_connect(purger_t* s);
+
+static void do_reconnect_socket(purger_t* s) {
+    close_socket(s);
+    purger_connect(s);
+}
+
+// rv "idle": false -> outbuf has a purge to send
+//            true -> queue empty, nothing in outbuf (idle-time!)
+// This should work for all outbuf/queue states.
+static bool _txn_prep_buffers(purger_t* s, const bool clear_current) {
+    dmn_assert(s);
+    dmn_assert(s->outbuf);
+    dmn_assert(s->inbuf);
+
+    bool idle = true;
+
+    s->inbuf_parsed = 0;
+    s->outbuf_written = 0;
+    if(clear_current)
+        s->outbuf_bytes = 0;
+
+    if(s->outbuf_bytes) {
+        idle = false;
+    } else {
+        unsigned req_len;
+        const char* req;
+        req = strq_dequeue(s->queue, &req_len);
+        if(req) {
+            memcpy(s->outbuf, req, req_len);
+            s->outbuf_bytes = req_len;
+            idle = false;
+        }
+    }
+
+    return idle;
+}
+
+// Called at any txn/connection boundary (purge success/fail, connection 
success).
+// Not called while cycling within reconnect attempt callbacks.
+// If the "reconnect" argument is present, this causes a disconnect->reconnect 
cycle,
+// otherwise it ensures the buffer has a live request if possible and moves to
+// either the idle state or the sending state.  "clear_current" wipes the
+// currently-buffered request in the case it's suspected of being malformed.
+static void on_txn_boundary(purger_t* s, const bool clear_current, const bool 
reconnect) {
+    dmn_assert(s);
+
+    const bool idle = _txn_prep_buffers(s, clear_current);
+
+    // force reconnect here if we pass our persistence limits
+    if(reconnect || s->fd_reqs > PERSIST_REQS || s->fd_expire <= 
ev_now(s->loop)) {
+        do_reconnect_socket(s);
+    } else if(idle) {
+        s->state = PST_CONN_IDLE;
+        ev_io_stop(s->loop, s->write_watcher);
+        s->timeout_watcher->repeat = s->io_timeout;
+        ev_timer_again(s->loop, s->timeout_watcher);
+    } else {
+        s->state = PST_SENDWAIT;
+        ev_io_start(s->loop, s->write_watcher);
+        s->timeout_watcher->repeat = s->io_timeout;
+        ev_timer_again(s->loop, s->timeout_watcher);
+        ev_invoke(s->loop, s->write_watcher, EV_WRITE);
+    }
+}
+
+static void on_connect_success(purger_t* s) {
+    dmn_assert(s);
+    dmn_assert(s->state == PST_CONNECTING);
+
+    s->fd_expire = ev_now(s->loop) + PERSIST_TIME;
+    s->conn_wait_timeout = CONN_WAIT_INIT;
+    ev_io_set(s->read_watcher, s->fd, EV_READ);
+    ev_io_start(s->loop, s->read_watcher);
+    on_txn_boundary(s, false, false);
+}
+
+static void on_connect_fail(purger_t* s, const char* reason, const int 
so_error) {
+    dmn_assert(s);
+    dmn_assert(s->state == PST_CONNECTING);
+
+    dmn_log_err(
+        "TCP connect to %s failed (%s): %s",
+        dmn_logf_anysin(&s->daddr), reason,
+        so_error ? dmn_logf_errnum(so_error) : dmn_logf_errno()
+    );
+    close_socket(s);
+    s->state = PST_NOTCONN_WAIT;
+    if(s->conn_wait_timeout < CONN_WAIT_MAX)
+        s->conn_wait_timeout <<= 1;
+    s->timeout_watcher->repeat = s->conn_wait_timeout;
+    ev_timer_again(s->loop, s->timeout_watcher);
 }
 
 static void purger_connect(purger_t* s) {
     dmn_assert(s);
 
-    dmn_log_debug("purger: %s/%s -> hit purger_connect()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    dmn_log_debug("purger: %s/%s -> purger_connect()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
 
-    // we arrive in this function from several states/callbacks, but
-    //   in all cases they should put us in this intermediate state first:
+    // we arrive in this function from several states/callbacks, but in
+    // all cases they should put us in this intermediate state first (no
+    // half-processed in or out buffers, no active i/o watchers, no
+    // socket fd)
     dmn_assert(s->fd == -1);
-    dmn_assert(s->outbuf_bytes);
-    dmn_assert(!s->outbuf_written);
-    dmn_assert(!s->inbuf_parsed);
-    dmn_assert(!ev_is_active(s->timeout_watcher));
     dmn_assert(!ev_is_active(s->read_watcher));
     dmn_assert(!ev_is_active(s->write_watcher));
+
+    // set our proper state during connection attempts
+    s->state = PST_CONNECTING;
 
     // cache the protoent, because in many cases this blocks reading a 
database...
     static struct protoent* pe = NULL;
@@ -347,36 +386,18 @@
     //   a nonblocking socket, but it's possible it succeeds immediately for 
localhost...
     if(connect(s->fd, &s->daddr.sa, s->daddr.len) == -1) {
         if(errno != EINPROGRESS) {
-            // hard/fast failure
-            dmn_log_err("TCP connect to %s failed: %s", 
dmn_logf_anysin(&s->daddr), dmn_logf_errno());
-            purger_closefd(s);
-            s->state = PST_NOTCONN_WAIT;
-            if(s->conn_wait_timeout < CONN_WAIT_MAX)
-                s->conn_wait_timeout <<= 1;
-            ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.);
-            ev_timer_start(s->loop, s->timeout_watcher);
+            on_connect_fail(s, "immediate", 0);
         }
         else {
             // return to libev until connection is ready
-            s->state = PST_CONNECTING;
             ev_io_set(s->write_watcher, s->fd, EV_WRITE);
             ev_io_start(s->loop, s->write_watcher);
-            ev_timer_set(s->timeout_watcher, s->io_timeout, 0.);
-            ev_timer_start(s->loop, s->timeout_watcher);
+            s->timeout_watcher->repeat = s->io_timeout;
+            ev_timer_again(s->loop, s->timeout_watcher);
         }
     }
-    else {
-        // immediate success, straight to send attempt
-        s->fd_expire = ev_now(s->loop) + PERSIST_TIME;
-        s->state = PST_SENDWAIT;
-        s->conn_wait_timeout = CONN_WAIT_INIT;
-        ev_io_set(s->write_watcher, s->fd, EV_WRITE);
-        ev_io_start(s->loop, s->write_watcher);
-        ev_io_set(s->read_watcher, s->fd, EV_READ);
-        ev_io_start(s->loop, s->read_watcher);
-        ev_timer_set(s->timeout_watcher, s->io_timeout, 0.);
-        ev_timer_start(s->loop, s->timeout_watcher);
-        ev_invoke(s->loop, s->write_watcher, EV_WRITE);
+    else { // immediately-successful connect!
+        on_connect_success(s);
     }
 }
 
@@ -384,7 +405,7 @@
     dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_WRITE);
 
     purger_t* s = w->data;
-    dmn_log_debug("purger: %s/%s -> hit purger_write_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    dmn_log_debug("purger: %s/%s -> purger_write_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
     purger_assert_sanity(s);
 
     // This callback only happens in two states: CONNECTING and SENDWAIT...
@@ -393,25 +414,11 @@
         int so_error = 0;
         unsigned int so_error_len = sizeof(so_error);
         getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_len);
-        if(so_error) {
-            dmn_log_err("TCP connect() failed: %s", dmn_logf_errnum(so_error));
-            purger_closefd(s);
-            s->state = PST_NOTCONN_WAIT;
-            if(s->conn_wait_timeout < CONN_WAIT_MAX)
-                s->conn_wait_timeout <<= 1;
-            ev_io_stop(s->loop, s->write_watcher);
-            ev_timer_stop(s->loop, s->timeout_watcher);
-            ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.);
-            ev_timer_start(s->loop, s->timeout_watcher);
-            return;
-        }
-        else { // successful connect(), alter state and fall through to the 
first send attempt
-            s->fd_expire = ev_now(s->loop) + PERSIST_TIME;
-            s->state = PST_SENDWAIT;
-            s->conn_wait_timeout = CONN_WAIT_INIT;
-            ev_io_set(s->read_watcher, s->fd, EV_READ);
-            ev_io_start(s->loop, s->read_watcher);
-        }
+        if(so_error)
+            on_connect_fail(s, "nonblock", so_error);
+        else
+            on_connect_success(s);
+        return;
     }
 
     dmn_assert(s->state == PST_SENDWAIT);
@@ -436,13 +443,8 @@
                 dmn_log_err("TCP conn to %s failed while writing: %s", 
dmn_logf_anysin(&s->daddr), dmn_logf_errno());
         }
 
-        // reset state to try this send from the top on a fresh connection...
-        s->outbuf_written = 0;
-        purger_closefd(s);
-        ev_io_stop(s->loop, s->write_watcher);
-        ev_io_stop(s->loop, s->read_watcher);
-        ev_timer_stop(s->loop, s->timeout_watcher);
-        purger_connect(s);
+        // close up connection and reconnect, do not clear current output 
buffer
+        on_txn_boundary(s, false, true);
     }
     else {
         if(writerv < (int)to_send) {
@@ -459,47 +461,12 @@
     }
 }
 
-// Common "clean up connection" code for multiple points in the read_cb below
-// If clear_current is true, wipe the current request and decide next state
-//   based on presence of another queued request, otherwise reconnect to
-//   re-send the current request.
-static void close_from_read_cb(purger_t* s, const bool clear_current) {
-    dmn_assert(s);
-    dmn_assert(s->state == PST_CONN_IDLE || s->state == PST_SENDWAIT || 
s->state == PST_RECVWAIT);
-
-    purger_closefd(s);
-    ev_timer_stop(s->loop, s->timeout_watcher);
-    ev_io_stop(s->loop, s->read_watcher);
-
-    if(s->state == PST_CONN_IDLE) {
-        dmn_assert(!clear_current); // there was no "current" buffer output
-        s->state = PST_NOTCONN_IDLE;
-    }
-    else { // SENDWAIT or RECVWAIT
-        if(s->state == PST_SENDWAIT)
-            ev_io_stop(s->loop, s->write_watcher);
-        else
-            s->inbuf_parsed = 0;
-        s->outbuf_written = 0;
-
-        if(clear_current) {
-            s->outbuf_bytes = 0;
-            dequeue_to_outbuf(s);
-        }
-
-        if(s->outbuf_bytes) // existing, or clear_current -> next queue entry
-            purger_connect(s);
-        else
-            s->state = PST_NOTCONN_IDLE;
-    }
-}
-
 static void purger_read_cb(struct ev_loop* loop, ev_io* w, int revents) {
     dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_READ);
 
     purger_t* s = w->data;
     purger_assert_sanity(s);
-    dmn_log_debug("purger: %s/%s -> hit purger_read_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    dmn_log_debug("purger: %s/%s -> purger_read_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
 
     const unsigned to_recv = s->inbuf_size - s->inbuf_parsed;
     int recvrv = recv(s->fd, &s->inbuf[s->inbuf_parsed], to_recv, 0);
@@ -521,7 +488,7 @@
                 // abormal problems, mention it in the log
                 dmn_log_err("TCP conn to %s failed while reading: %s", 
dmn_logf_anysin(&s->daddr), dmn_logf_errno());
         }
-        close_from_read_cb(s, false);
+        on_txn_boundary(s, false, true);
         return;
     }
 
@@ -532,12 +499,12 @@
             dmn_log_debug("purger: %s/%s -> purger_read_cb silent result: 
server closed", dmn_logf_anysin(&s->daddr), state_strs[s->state]);
         else
             dmn_log_err("TCP conn to %s: received unexpected data from server 
during request-send or idle phases...", dmn_logf_anysin(&s->daddr));
-        close_from_read_cb(s, false);
+        on_txn_boundary(s, false, true);
         return;
     }
     else if(recvrv == 0) {
         dmn_log_err("TCP conn to %s: connection closed while waiting on 
response", dmn_logf_anysin(&s->daddr));
-        close_from_read_cb(s, false);
+        on_txn_boundary(s, false, true);
         return;
     }
 
@@ -559,7 +526,7 @@
     s->inbuf_parsed += hpe_parsed;
     if(s->parser->http_errno != HPE_OK || hpe_parsed != recvrv) { // not 
parseable, could be more trailing garbage, close it all down
         dmn_log_err("TCP conn to %s: response unparseable (parser error %s), 
dropping request", dmn_logf_anysin(&s->daddr), 
http_errno_description(s->parser->http_errno));
-        close_from_read_cb(s, true);
+        on_txn_boundary(s, true, true);
         return;
     }
     else if(pr.cb_called) { // parsed a full response
@@ -570,42 +537,17 @@
 
         // Only forward to next purger and mark sent in stats if status was 
reasonable
         if(pr.status_ok) {
-            if(s->next_purger)
+            if(s->next_purger) {
                 purger_enqueue(s->next_purger, s->outbuf, s->outbuf_bytes);
+                purger_ping(s->next_purger);
+            }
             stats.purgers[s->idx].inpkts_sent++;
         }
         else {
             dmn_log_warn("PURGE response code was was >= 400");
         }
 
-        // reset i/o
-        s->outbuf_bytes = s->outbuf_written = s->inbuf_parsed = 0;
-
-        // no matter which path, current timer needs to go
-        ev_timer_stop(s->loop, s->timeout_watcher);
-
-        if(pr.need_to_close || purger_check_fd_limits(s)) {
-            purger_closefd(s);
-            ev_io_stop(s->loop, s->read_watcher);
-            if(!dequeue_to_outbuf(s))
-                purger_connect(s);
-            else
-                s->state = PST_NOTCONN_IDLE;
-        }
-        else { // maintain connection
-            if(!dequeue_to_outbuf(s)) {
-                ev_timer_set(s->timeout_watcher, s->io_timeout, 0.);
-                ev_timer_start(s->loop, s->timeout_watcher);
-                ev_io_start(s->loop, s->write_watcher);
-                s->state = PST_SENDWAIT;
-                ev_invoke(s->loop, s->write_watcher, EV_WRITE); // predictive, 
EAGAIN if not
-            }
-            else {
-                ev_timer_set(s->timeout_watcher, s->idle_timeout, 0.);
-                ev_timer_start(s->loop, s->timeout_watcher);
-                s->state = PST_CONN_IDLE;
-            }
-        }
+        on_txn_boundary(s, true, pr.need_to_close);
     }
     else {
         // If neither of the above, parser consumed all available data and 
didn't complete the message,
@@ -618,41 +560,25 @@
     dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_TIMER);
 
     purger_t* s = w->data;
-    dmn_log_debug("purger: %s/%s -> hit purger_timeout_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    dmn_log_debug("purger: %s/%s -> purger_timeout_cb()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
     purger_assert_sanity(s);
 
-    // this is potentially invoked in every state except NOTCONN_IDLE...
+    // this is potentially invoked in every state
     switch(s->state) {
         case PST_CONN_IDLE:
-            s->state = PST_NOTCONN_IDLE;
-            ev_io_stop(s->loop, s->read_watcher);
-            purger_closefd(s);
+            do_reconnect_socket(s);
             break;
         case PST_CONNECTING:
             dmn_log_warn("connect() to %s timed out", 
dmn_logf_anysin(&s->daddr));
-            s->state = PST_NOTCONN_WAIT;
-            ev_io_stop(s->loop, s->write_watcher);
-            purger_closefd(s);
-            if(s->conn_wait_timeout < CONN_WAIT_MAX)
-                s->conn_wait_timeout <<= 1;
-            ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.);
-            ev_timer_start(s->loop, s->timeout_watcher);
+            on_connect_fail(s, "timeout", 0);
             break;
         case PST_SENDWAIT:
             dmn_log_warn("send to %s timed out", dmn_logf_anysin(&s->daddr));
-            s->outbuf_written = 0;
-            ev_io_stop(s->loop, s->write_watcher);
-            ev_io_stop(s->loop, s->read_watcher);
-            purger_closefd(s);
-            purger_connect(s);
+            on_txn_boundary(s, false, true);
             break;
         case PST_RECVWAIT:
             dmn_log_warn("recv from %s timed out after receiving %u bytes", 
dmn_logf_anysin(&s->daddr), s->inbuf_parsed);
-            s->outbuf_written = 0;
-            s->inbuf_parsed = 0;
-            ev_io_stop(s->loop, s->read_watcher);
-            purger_closefd(s);
-            purger_connect(s);
+            on_txn_boundary(s, false, true);
             break;
         case PST_NOTCONN_WAIT:
             purger_connect(s);
@@ -694,53 +620,32 @@
     ev_set_priority(s->timeout_watcher, 0);
     s->timeout_watcher->data = s;
 
+    // Initiate the first connection
+    purger_connect(s);
+
     return s;
 }
 
 void purger_enqueue(purger_t* s, const char* req, const unsigned req_len) {
     dmn_assert(s); dmn_assert(req); dmn_assert(req_len);
-    dmn_log_debug("purger: %s/%s -> hit purger_ping()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    dmn_log_debug("purger: %s/%s -> purger_enqueue()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
     purger_assert_sanity(s);
 
     strq_enqueue(s->queue, req, req_len);
     stats.purgers[s->idx].inpkts_enqueued++;
+}
 
-    // enqueue can happen in any state, but actions differ:
-    switch(s->state) {
-        // when in either idle state, dequeue and take action immediately
-        case PST_NOTCONN_IDLE:
-            dequeue_to_outbuf(s);
-            purger_connect(s); // state transition is conditional within
-            break;
-        case PST_CONN_IDLE:
-            dequeue_to_outbuf(s);
-            ev_io_start(s->loop, s->write_watcher);
-            ev_timer_stop(s->loop, s->timeout_watcher);
-            ev_timer_set(s->timeout_watcher, s->io_timeout, 0.);
-            ev_timer_start(s->loop, s->timeout_watcher);
-            s->state = PST_SENDWAIT;
-            break;
-
-        // When in non-idle states, there's nothing to do here.
-        //   the queue will be checked when current actions are completed later
-        case PST_CONNECTING:
-        case PST_NOTCONN_WAIT:
-        case PST_SENDWAIT:
-        case PST_RECVWAIT:
-            break;
-
-        default:
-            dmn_assert(0);
-            break;
-    }
+void purger_ping(purger_t*s) {
+    dmn_assert(s);
+    dmn_log_debug("purger: %s/%s -> purger_ping()", 
dmn_logf_anysin(&s->daddr), state_strs[s->state]);
+    // In all other states we'll check the queue later on our own...
+    if(s->state == PST_CONN_IDLE)
+        on_txn_boundary(s, false, false);
 }
 
 void purger_destroy(purger_t* s) {
-    ev_io_stop(s->loop, s->write_watcher);
-    ev_io_stop(s->loop, s->read_watcher);
     ev_timer_stop(s->loop, s->timeout_watcher);
-    if(s->fd != -1)
-        purger_closefd(s);
+    close_socket(s);
     free(s->write_watcher);
     free(s->read_watcher);
     free(s->timeout_watcher);
diff --git a/src/purger.h b/src/purger.h
index 934503b..2b6e5ee 100644
--- a/src/purger.h
+++ b/src/purger.h
@@ -15,6 +15,7 @@
 // Sender does not own the loop, the caller does.
 purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr, 
purger_t* next_purger, unsigned idx, unsigned max_mb, unsigned io_timeout, 
unsigned idle_timeout);
 void purger_enqueue(purger_t* s, const char* req, const unsigned req_len);
+void purger_ping(purger_t* s);
 void purger_destroy(purger_t* s);
 
 #endif // VHTCPD_PURGER_H
diff --git a/src/receiver.c b/src/receiver.c
index 5d0a7d6..4bb44dd 100644
--- a/src/receiver.c
+++ b/src/receiver.c
@@ -253,6 +253,7 @@
     }
 
     dmn_log_debug("receiver: done recv()ing, enqueued: %u", MAX_TIGHT_QUEUE - 
queued_ctr);
+    purger_ping(r->purger);
 }
 
 receiver_t* receiver_new(struct ev_loop* loop, const pcre* matcher, const 
pcre_extra* matcher_extra, purger_t* purger, int lsock, bool purge_full_url) {

-- 
To view, visit https://gerrit.wikimedia.org/r/384167
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Icd5e96a1bae4dac65c81f80b07f99e9388608fe0
Gerrit-PatchSet: 1
Gerrit-Project: operations/software/varnish/vhtcpd
Gerrit-Branch: master
Gerrit-Owner: BBlack <bbl...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to