BBlack has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/384167 )
Change subject: Refactor/improve purging code ...................................................................... Refactor/improve purging code This refactor removes the PST_NOTCONN_WAIT state from the state machine. The code no longer has such a state as it always reconnects immediately, even if there is no currently-pending output. Much of the rest of the logic was de-duplicated into a few new helper functions, and some of the internal state magic was made a bit more idempotent. Change-Id: Icd5e96a1bae4dac65c81f80b07f99e9388608fe0 --- M src/main.c M src/purger.c M src/purger.h M src/receiver.c 4 files changed, 172 insertions(+), 265 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/software/varnish/vhtcpd refs/changes/67/384167/1 diff --git a/src/main.c b/src/main.c index 23896f7..aa338a2 100644 --- a/src/main.c +++ b/src/main.c @@ -34,7 +34,7 @@ /* global libev priorities: * 2) receiver input * 1) purger i/o - * 0) purger idle timer + * 0) purger timeouts/timers * -1) strq excess space reclamation * -2) stats/monitor stuff... */ diff --git a/src/purger.c b/src/purger.c index 80e6ff8..e0e5713 100644 --- a/src/purger.c +++ b/src/purger.c @@ -38,9 +38,6 @@ #include "http-parser/http_parser.h" // XXX some of the below could be configurable as well -// XXX note that while this is set up to try to take advantage -// of keep-alive, apparently varnish closes after every -// response to a PURGE, currently. // this buffer holds the complete HTTP response we get, and can grow at runtime #define INBUF_INITSIZE 4096U @@ -59,7 +56,7 @@ #define PERSIST_REQS 100000U #define PERSIST_TIME 900.0 -// These are the 6 possible states of the purger object. +// These are the 5 possible states of the purger object. // Note in the state transition code that we often *could* skip // straight through multiple states without returning to libev // (at least try and see if the next call doesn't EAGAIN), @@ -71,62 +68,46 @@ // but we still fully update the ev watcher states during these // to keep the code simpler to follow, even when it's likely // to be pointless churn. +// Note that all states use the timeout watcher, so we use it +// ev_timer_again() mode for efficiency (method 2 in the libev docs) typedef enum { - // In this state, no message is pending in outbuf or the queue, - // and we have no active connection and no active libev watchers. - // The only way out of NOTCONN_IDLE is a new purger_enqueue() call - // from the receiver code. - PST_NOTCONN_IDLE = 0, - - // In this state, there's a message pending in outbuf, and there - // may or may not be more in the queue, and we're in the process - // of trying to establish the outbound connection (waiting on - // nonblock connect() success). + // In this state we're in the process of trying to establish the + // outbound connection (waiting on nonblock connect() success). + // Possible next states: PST_CONN_IDLE, PST_NOTCONN_WAIT + // Active watchers: write, timeout PST_CONNECTING, - // This is an exception state that occurs when the connect() - // attempt above fails. We wait a short timeout before moving - // back to CONNECTING and trying again. Note that in other - // connection failure cases (during read/write), we immediately - // retry the connection first, and don't wait until that - // connect() attempt fails. + // While disconnects that occur from other states (send, recv, idle) + // result in an immediate reconnect attempt (connecting, above), a + // failure during connecting itself results in exponential backoff + // delays between reconnection attempts, and they wait out their + // timers in this state. + // Possible next states: PST_CONNECTING + // Active watchers: timeout PST_NOTCONN_WAIT, - // In these two states, outbuf has a complete message pending, - // and we have a live connection to use. - // In the SENDWAIT state we've written less than all bytes. - // In the RECVWAIT state we've written all bytes and haven't - // yet read a complete response. - // If we eventually succeed in both sending the complete message - // and receiving an acceptable response code, the transaction - // will finish and outbuf will be cleared. Whether we immediately - // shift back to SENDWAIT or CONN_IDLE depends on the queue. - // Various failure scenarios lead to other outcomes. Some bad - // status returns from the server should lead to dropping the - // current outbuf and moving on (possible bad URL). Some - // should result in terminating the connection but trying - // the same URL again on the next connection. + // SENDWAIT means we're waiting to send request bytes to the purger. + // Possible next states: PST_RECVWAIT, PST_CONN_IDLE, PST_CONNECTING + // Active watchers: read, write, timeout PST_SENDWAIT, + + // Possible next states: PST_SENDWAIT, PST_CONN_IDLE, PST_CONNECTING + // Active watchers: read, timeout PST_RECVWAIT, // In this state, outbuf is empty, the queue is empty, but we still // have a live HTTP connection to the server from a previous // message, and we're ready to send another message if one arrives // via the queue. - // If the server disconnects us or we hit our own idle timeout - // and disconnect from it, we'll move back to _NOTCONN and - // wait there until another message needs to be sent. If - // a new URL comes in via purger_enqueue() before that, we'll - // move straight back to _SENDWAIT (and then possibly right - // back through to _RECVWAIT as well). + // Possible next states: PST_SENDWAIT, PST_CONNECTING + // Active watchers: read, timeout PST_CONN_IDLE, } purger_state_t; // for debug logging #ifndef NDEBUG static const char* state_strs[] = { - "NOTCONN_IDLE", "CONNECTING", "NOTCONN_WAIT", "SENDWAIT", @@ -213,32 +194,17 @@ dmn_assert(s->write_watcher); dmn_assert(s->read_watcher); dmn_assert(s->timeout_watcher); + dmn_assert(ev_is_active(s->timeout_watcher)); switch(s->state) { - case PST_NOTCONN_IDLE: - dmn_assert(s->fd == -1); - dmn_assert(!s->outbuf_bytes); - dmn_assert(!s->outbuf_written); - dmn_assert(!s->inbuf_parsed); - dmn_assert(!s->fd_reqs); - dmn_assert(!ev_is_active(s->write_watcher)); - dmn_assert(!ev_is_active(s->read_watcher)); - dmn_assert(!ev_is_active(s->timeout_watcher)); - break; case PST_CONNECTING: dmn_assert(s->fd != -1); - dmn_assert(s->outbuf_bytes); - dmn_assert(!s->outbuf_written); - dmn_assert(!s->inbuf_parsed); dmn_assert(!s->fd_reqs); dmn_assert(ev_is_active(s->write_watcher)); dmn_assert(!ev_is_active(s->read_watcher)); break; case PST_NOTCONN_WAIT: dmn_assert(s->fd == -1); - dmn_assert(s->outbuf_bytes); - dmn_assert(!s->outbuf_written); - dmn_assert(!s->inbuf_parsed); dmn_assert(!s->fd_reqs); dmn_assert(!ev_is_active(s->write_watcher)); dmn_assert(!ev_is_active(s->read_watcher)); @@ -273,54 +239,127 @@ #endif -// rv: false -> placed something in outbuf -// true -> queue empty, nothing placed in outbuf -static bool dequeue_to_outbuf(purger_t* s) { - dmn_assert(s); - dmn_assert(s->outbuf); - dmn_assert(!s->outbuf_bytes); // no other packet currently buffered - dmn_assert(!s->outbuf_written); // no other packet currently buffered - - unsigned req_len; - const char* req; - req = strq_dequeue(s->queue, &req_len); - if(req) { - memcpy(s->outbuf, req, req_len); - s->outbuf_bytes = req_len; - return false; +static void close_socket(purger_t* s) { + ev_io_stop(s->loop, s->write_watcher); + ev_io_stop(s->loop, s->read_watcher); + if(s->fd != -1) { + shutdown(s->fd, SHUT_RDWR); + close(s->fd); } - - return true; -} - -// Common socket-close code -static void purger_closefd(purger_t* s) { - shutdown(s->fd, SHUT_RDWR); - close(s->fd); s->fd = -1; s->fd_expire = 0.; s->fd_reqs = 0; } -static bool purger_check_fd_limits(purger_t* s) { - return (s->fd_reqs > PERSIST_REQS) - || (s->fd_expire <= ev_now(s->loop)); +static void purger_connect(purger_t* s); + +static void do_reconnect_socket(purger_t* s) { + close_socket(s); + purger_connect(s); +} + +// rv "idle": false -> outbuf has a purge to send +// true -> queue empty, nothing in outbuf (idle-time!) +// This should work for all outbuf/queue states. +static bool _txn_prep_buffers(purger_t* s, const bool clear_current) { + dmn_assert(s); + dmn_assert(s->outbuf); + dmn_assert(s->inbuf); + + bool idle = true; + + s->inbuf_parsed = 0; + s->outbuf_written = 0; + if(clear_current) + s->outbuf_bytes = 0; + + if(s->outbuf_bytes) { + idle = false; + } else { + unsigned req_len; + const char* req; + req = strq_dequeue(s->queue, &req_len); + if(req) { + memcpy(s->outbuf, req, req_len); + s->outbuf_bytes = req_len; + idle = false; + } + } + + return idle; +} + +// Called at any txn/connection boundary (purge success/fail, connection success). +// Not called while cycling within reconnect attempt callbacks. +// If the "reconnect" argument is present, this causes a disconnect->reconnect cycle, +// otherwise it ensures the buffer has a live request if possible and moves to +// either the idle state or the sending state. "clear_current" wipes the +// currently-buffered request in the case it's suspected of being malformed. +static void on_txn_boundary(purger_t* s, const bool clear_current, const bool reconnect) { + dmn_assert(s); + + const bool idle = _txn_prep_buffers(s, clear_current); + + // force reconnect here if we pass our persistence limits + if(reconnect || s->fd_reqs > PERSIST_REQS || s->fd_expire <= ev_now(s->loop)) { + do_reconnect_socket(s); + } else if(idle) { + s->state = PST_CONN_IDLE; + ev_io_stop(s->loop, s->write_watcher); + s->timeout_watcher->repeat = s->io_timeout; + ev_timer_again(s->loop, s->timeout_watcher); + } else { + s->state = PST_SENDWAIT; + ev_io_start(s->loop, s->write_watcher); + s->timeout_watcher->repeat = s->io_timeout; + ev_timer_again(s->loop, s->timeout_watcher); + ev_invoke(s->loop, s->write_watcher, EV_WRITE); + } +} + +static void on_connect_success(purger_t* s) { + dmn_assert(s); + dmn_assert(s->state == PST_CONNECTING); + + s->fd_expire = ev_now(s->loop) + PERSIST_TIME; + s->conn_wait_timeout = CONN_WAIT_INIT; + ev_io_set(s->read_watcher, s->fd, EV_READ); + ev_io_start(s->loop, s->read_watcher); + on_txn_boundary(s, false, false); +} + +static void on_connect_fail(purger_t* s, const char* reason, const int so_error) { + dmn_assert(s); + dmn_assert(s->state == PST_CONNECTING); + + dmn_log_err( + "TCP connect to %s failed (%s): %s", + dmn_logf_anysin(&s->daddr), reason, + so_error ? dmn_logf_errnum(so_error) : dmn_logf_errno() + ); + close_socket(s); + s->state = PST_NOTCONN_WAIT; + if(s->conn_wait_timeout < CONN_WAIT_MAX) + s->conn_wait_timeout <<= 1; + s->timeout_watcher->repeat = s->conn_wait_timeout; + ev_timer_again(s->loop, s->timeout_watcher); } static void purger_connect(purger_t* s) { dmn_assert(s); - dmn_log_debug("purger: %s/%s -> hit purger_connect()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + dmn_log_debug("purger: %s/%s -> purger_connect()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); - // we arrive in this function from several states/callbacks, but - // in all cases they should put us in this intermediate state first: + // we arrive in this function from several states/callbacks, but in + // all cases they should put us in this intermediate state first (no + // half-processed in or out buffers, no active i/o watchers, no + // socket fd) dmn_assert(s->fd == -1); - dmn_assert(s->outbuf_bytes); - dmn_assert(!s->outbuf_written); - dmn_assert(!s->inbuf_parsed); - dmn_assert(!ev_is_active(s->timeout_watcher)); dmn_assert(!ev_is_active(s->read_watcher)); dmn_assert(!ev_is_active(s->write_watcher)); + + // set our proper state during connection attempts + s->state = PST_CONNECTING; // cache the protoent, because in many cases this blocks reading a database... static struct protoent* pe = NULL; @@ -347,36 +386,18 @@ // a nonblocking socket, but it's possible it succeeds immediately for localhost... if(connect(s->fd, &s->daddr.sa, s->daddr.len) == -1) { if(errno != EINPROGRESS) { - // hard/fast failure - dmn_log_err("TCP connect to %s failed: %s", dmn_logf_anysin(&s->daddr), dmn_logf_errno()); - purger_closefd(s); - s->state = PST_NOTCONN_WAIT; - if(s->conn_wait_timeout < CONN_WAIT_MAX) - s->conn_wait_timeout <<= 1; - ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); + on_connect_fail(s, "immediate", 0); } else { // return to libev until connection is ready - s->state = PST_CONNECTING; ev_io_set(s->write_watcher, s->fd, EV_WRITE); ev_io_start(s->loop, s->write_watcher); - ev_timer_set(s->timeout_watcher, s->io_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); + s->timeout_watcher->repeat = s->io_timeout; + ev_timer_again(s->loop, s->timeout_watcher); } } - else { - // immediate success, straight to send attempt - s->fd_expire = ev_now(s->loop) + PERSIST_TIME; - s->state = PST_SENDWAIT; - s->conn_wait_timeout = CONN_WAIT_INIT; - ev_io_set(s->write_watcher, s->fd, EV_WRITE); - ev_io_start(s->loop, s->write_watcher); - ev_io_set(s->read_watcher, s->fd, EV_READ); - ev_io_start(s->loop, s->read_watcher); - ev_timer_set(s->timeout_watcher, s->io_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); - ev_invoke(s->loop, s->write_watcher, EV_WRITE); + else { // immediately-successful connect! + on_connect_success(s); } } @@ -384,7 +405,7 @@ dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_WRITE); purger_t* s = w->data; - dmn_log_debug("purger: %s/%s -> hit purger_write_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + dmn_log_debug("purger: %s/%s -> purger_write_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); purger_assert_sanity(s); // This callback only happens in two states: CONNECTING and SENDWAIT... @@ -393,25 +414,11 @@ int so_error = 0; unsigned int so_error_len = sizeof(so_error); getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_len); - if(so_error) { - dmn_log_err("TCP connect() failed: %s", dmn_logf_errnum(so_error)); - purger_closefd(s); - s->state = PST_NOTCONN_WAIT; - if(s->conn_wait_timeout < CONN_WAIT_MAX) - s->conn_wait_timeout <<= 1; - ev_io_stop(s->loop, s->write_watcher); - ev_timer_stop(s->loop, s->timeout_watcher); - ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); - return; - } - else { // successful connect(), alter state and fall through to the first send attempt - s->fd_expire = ev_now(s->loop) + PERSIST_TIME; - s->state = PST_SENDWAIT; - s->conn_wait_timeout = CONN_WAIT_INIT; - ev_io_set(s->read_watcher, s->fd, EV_READ); - ev_io_start(s->loop, s->read_watcher); - } + if(so_error) + on_connect_fail(s, "nonblock", so_error); + else + on_connect_success(s); + return; } dmn_assert(s->state == PST_SENDWAIT); @@ -436,13 +443,8 @@ dmn_log_err("TCP conn to %s failed while writing: %s", dmn_logf_anysin(&s->daddr), dmn_logf_errno()); } - // reset state to try this send from the top on a fresh connection... - s->outbuf_written = 0; - purger_closefd(s); - ev_io_stop(s->loop, s->write_watcher); - ev_io_stop(s->loop, s->read_watcher); - ev_timer_stop(s->loop, s->timeout_watcher); - purger_connect(s); + // close up connection and reconnect, do not clear current output buffer + on_txn_boundary(s, false, true); } else { if(writerv < (int)to_send) { @@ -459,47 +461,12 @@ } } -// Common "clean up connection" code for multiple points in the read_cb below -// If clear_current is true, wipe the current request and decide next state -// based on presence of another queued request, otherwise reconnect to -// re-send the current request. -static void close_from_read_cb(purger_t* s, const bool clear_current) { - dmn_assert(s); - dmn_assert(s->state == PST_CONN_IDLE || s->state == PST_SENDWAIT || s->state == PST_RECVWAIT); - - purger_closefd(s); - ev_timer_stop(s->loop, s->timeout_watcher); - ev_io_stop(s->loop, s->read_watcher); - - if(s->state == PST_CONN_IDLE) { - dmn_assert(!clear_current); // there was no "current" buffer output - s->state = PST_NOTCONN_IDLE; - } - else { // SENDWAIT or RECVWAIT - if(s->state == PST_SENDWAIT) - ev_io_stop(s->loop, s->write_watcher); - else - s->inbuf_parsed = 0; - s->outbuf_written = 0; - - if(clear_current) { - s->outbuf_bytes = 0; - dequeue_to_outbuf(s); - } - - if(s->outbuf_bytes) // existing, or clear_current -> next queue entry - purger_connect(s); - else - s->state = PST_NOTCONN_IDLE; - } -} - static void purger_read_cb(struct ev_loop* loop, ev_io* w, int revents) { dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_READ); purger_t* s = w->data; purger_assert_sanity(s); - dmn_log_debug("purger: %s/%s -> hit purger_read_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + dmn_log_debug("purger: %s/%s -> purger_read_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); const unsigned to_recv = s->inbuf_size - s->inbuf_parsed; int recvrv = recv(s->fd, &s->inbuf[s->inbuf_parsed], to_recv, 0); @@ -521,7 +488,7 @@ // abormal problems, mention it in the log dmn_log_err("TCP conn to %s failed while reading: %s", dmn_logf_anysin(&s->daddr), dmn_logf_errno()); } - close_from_read_cb(s, false); + on_txn_boundary(s, false, true); return; } @@ -532,12 +499,12 @@ dmn_log_debug("purger: %s/%s -> purger_read_cb silent result: server closed", dmn_logf_anysin(&s->daddr), state_strs[s->state]); else dmn_log_err("TCP conn to %s: received unexpected data from server during request-send or idle phases...", dmn_logf_anysin(&s->daddr)); - close_from_read_cb(s, false); + on_txn_boundary(s, false, true); return; } else if(recvrv == 0) { dmn_log_err("TCP conn to %s: connection closed while waiting on response", dmn_logf_anysin(&s->daddr)); - close_from_read_cb(s, false); + on_txn_boundary(s, false, true); return; } @@ -559,7 +526,7 @@ s->inbuf_parsed += hpe_parsed; if(s->parser->http_errno != HPE_OK || hpe_parsed != recvrv) { // not parseable, could be more trailing garbage, close it all down dmn_log_err("TCP conn to %s: response unparseable (parser error %s), dropping request", dmn_logf_anysin(&s->daddr), http_errno_description(s->parser->http_errno)); - close_from_read_cb(s, true); + on_txn_boundary(s, true, true); return; } else if(pr.cb_called) { // parsed a full response @@ -570,42 +537,17 @@ // Only forward to next purger and mark sent in stats if status was reasonable if(pr.status_ok) { - if(s->next_purger) + if(s->next_purger) { purger_enqueue(s->next_purger, s->outbuf, s->outbuf_bytes); + purger_ping(s->next_purger); + } stats.purgers[s->idx].inpkts_sent++; } else { dmn_log_warn("PURGE response code was was >= 400"); } - // reset i/o - s->outbuf_bytes = s->outbuf_written = s->inbuf_parsed = 0; - - // no matter which path, current timer needs to go - ev_timer_stop(s->loop, s->timeout_watcher); - - if(pr.need_to_close || purger_check_fd_limits(s)) { - purger_closefd(s); - ev_io_stop(s->loop, s->read_watcher); - if(!dequeue_to_outbuf(s)) - purger_connect(s); - else - s->state = PST_NOTCONN_IDLE; - } - else { // maintain connection - if(!dequeue_to_outbuf(s)) { - ev_timer_set(s->timeout_watcher, s->io_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); - ev_io_start(s->loop, s->write_watcher); - s->state = PST_SENDWAIT; - ev_invoke(s->loop, s->write_watcher, EV_WRITE); // predictive, EAGAIN if not - } - else { - ev_timer_set(s->timeout_watcher, s->idle_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); - s->state = PST_CONN_IDLE; - } - } + on_txn_boundary(s, true, pr.need_to_close); } else { // If neither of the above, parser consumed all available data and didn't complete the message, @@ -618,41 +560,25 @@ dmn_assert(loop); dmn_assert(w); dmn_assert(revents == EV_TIMER); purger_t* s = w->data; - dmn_log_debug("purger: %s/%s -> hit purger_timeout_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + dmn_log_debug("purger: %s/%s -> purger_timeout_cb()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); purger_assert_sanity(s); - // this is potentially invoked in every state except NOTCONN_IDLE... + // this is potentially invoked in every state switch(s->state) { case PST_CONN_IDLE: - s->state = PST_NOTCONN_IDLE; - ev_io_stop(s->loop, s->read_watcher); - purger_closefd(s); + do_reconnect_socket(s); break; case PST_CONNECTING: dmn_log_warn("connect() to %s timed out", dmn_logf_anysin(&s->daddr)); - s->state = PST_NOTCONN_WAIT; - ev_io_stop(s->loop, s->write_watcher); - purger_closefd(s); - if(s->conn_wait_timeout < CONN_WAIT_MAX) - s->conn_wait_timeout <<= 1; - ev_timer_set(s->timeout_watcher, s->conn_wait_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); + on_connect_fail(s, "timeout", 0); break; case PST_SENDWAIT: dmn_log_warn("send to %s timed out", dmn_logf_anysin(&s->daddr)); - s->outbuf_written = 0; - ev_io_stop(s->loop, s->write_watcher); - ev_io_stop(s->loop, s->read_watcher); - purger_closefd(s); - purger_connect(s); + on_txn_boundary(s, false, true); break; case PST_RECVWAIT: dmn_log_warn("recv from %s timed out after receiving %u bytes", dmn_logf_anysin(&s->daddr), s->inbuf_parsed); - s->outbuf_written = 0; - s->inbuf_parsed = 0; - ev_io_stop(s->loop, s->read_watcher); - purger_closefd(s); - purger_connect(s); + on_txn_boundary(s, false, true); break; case PST_NOTCONN_WAIT: purger_connect(s); @@ -694,53 +620,32 @@ ev_set_priority(s->timeout_watcher, 0); s->timeout_watcher->data = s; + // Initiate the first connection + purger_connect(s); + return s; } void purger_enqueue(purger_t* s, const char* req, const unsigned req_len) { dmn_assert(s); dmn_assert(req); dmn_assert(req_len); - dmn_log_debug("purger: %s/%s -> hit purger_ping()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + dmn_log_debug("purger: %s/%s -> purger_enqueue()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); purger_assert_sanity(s); strq_enqueue(s->queue, req, req_len); stats.purgers[s->idx].inpkts_enqueued++; +} - // enqueue can happen in any state, but actions differ: - switch(s->state) { - // when in either idle state, dequeue and take action immediately - case PST_NOTCONN_IDLE: - dequeue_to_outbuf(s); - purger_connect(s); // state transition is conditional within - break; - case PST_CONN_IDLE: - dequeue_to_outbuf(s); - ev_io_start(s->loop, s->write_watcher); - ev_timer_stop(s->loop, s->timeout_watcher); - ev_timer_set(s->timeout_watcher, s->io_timeout, 0.); - ev_timer_start(s->loop, s->timeout_watcher); - s->state = PST_SENDWAIT; - break; - - // When in non-idle states, there's nothing to do here. - // the queue will be checked when current actions are completed later - case PST_CONNECTING: - case PST_NOTCONN_WAIT: - case PST_SENDWAIT: - case PST_RECVWAIT: - break; - - default: - dmn_assert(0); - break; - } +void purger_ping(purger_t*s) { + dmn_assert(s); + dmn_log_debug("purger: %s/%s -> purger_ping()", dmn_logf_anysin(&s->daddr), state_strs[s->state]); + // In all other states we'll check the queue later on our own... + if(s->state == PST_CONN_IDLE) + on_txn_boundary(s, false, false); } void purger_destroy(purger_t* s) { - ev_io_stop(s->loop, s->write_watcher); - ev_io_stop(s->loop, s->read_watcher); ev_timer_stop(s->loop, s->timeout_watcher); - if(s->fd != -1) - purger_closefd(s); + close_socket(s); free(s->write_watcher); free(s->read_watcher); free(s->timeout_watcher); diff --git a/src/purger.h b/src/purger.h index 934503b..2b6e5ee 100644 --- a/src/purger.h +++ b/src/purger.h @@ -15,6 +15,7 @@ // Sender does not own the loop, the caller does. purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr, purger_t* next_purger, unsigned idx, unsigned max_mb, unsigned io_timeout, unsigned idle_timeout); void purger_enqueue(purger_t* s, const char* req, const unsigned req_len); +void purger_ping(purger_t* s); void purger_destroy(purger_t* s); #endif // VHTCPD_PURGER_H diff --git a/src/receiver.c b/src/receiver.c index 5d0a7d6..4bb44dd 100644 --- a/src/receiver.c +++ b/src/receiver.c @@ -253,6 +253,7 @@ } dmn_log_debug("receiver: done recv()ing, enqueued: %u", MAX_TIGHT_QUEUE - queued_ctr); + purger_ping(r->purger); } receiver_t* receiver_new(struct ev_loop* loop, const pcre* matcher, const pcre_extra* matcher_extra, purger_t* purger, int lsock, bool purge_full_url) { -- To view, visit https://gerrit.wikimedia.org/r/384167 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Icd5e96a1bae4dac65c81f80b07f99e9388608fe0 Gerrit-PatchSet: 1 Gerrit-Project: operations/software/varnish/vhtcpd Gerrit-Branch: master Gerrit-Owner: BBlack <bbl...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits