Hello,
The attached patch fixes several ConnOpener problems by relying on
AsyncJob protections while maintaining a tighter grip on various I/O and
sleep states. It is in PREVIEW state because I would like to do more
testing, but it did pass basic tests, and I am not currently aware of
serious problems with the patch code.
I started with Rainer Weikusat's timeout polishing patch posted
yesterday, but all bugs are mine.
Here are some of the addressed problems:
* Connection descriptor was not closed when attempting to reconnect
after failures. We now properly close on failures, sleep with descriptor
closed, and then reopen.
* Timeout handler was not cleaned up properly in some cases, causing
memory leaks (for the handler Pointer) and possibly timeouts that were
fired (for then-active handler), after the connection was passed to the
initiator.
* Comm close handler was not cleaned up properly.
* Connection timeout was enforced for each connection attempt instead of
all attempts together.
and possibly other problems. The full extent of all side-effects of
mishandled race conditions and state conflicts is probably unknown.
TODO: Needs more testing, especially around corner cases.
Does somebody need more specific callback cancellation reasons?
Consider calling comm_close instead of direct write_data cleanup.
Make connect_timeout documentation in squid.conf less ambiguous.
Move prevalent conn_ debugging to the status() method?
Polish Comm timeout handling to always reset .timeout on callback?
Consider revising eventDelete() to delete between-I/O sleep
timeout.
Feedback welcomed.
Thank you,
Alex.
Author: Ales Rousskov <[email protected]>
Author: Rainer Weikusat <[email protected]>
Fixed several ConnOpener problems by relying on AsyncJob protections
while maintaining a tighter grip on various I/O and sleep states.
* Connection descriptor was not closed when attempting to reconnect after
failures. We now properly close on failures, sleep with descriptor closed,
and then reopen.
* Timeout handler was not cleaned up properly in some cases, causing memory
leaks (for the handler Pointer) and possibly timeouts that were fired (for
then-active handler), after the connection was passed to the initiator.
* Comm close handler was not cleaned up properly.
* Connection timeout was enforced for each connection attempt instead of all
attempts together.
and possibly other problems. The full extent of all side-effects of mishandled
race conditions and state conflicts is probably unknown.
TODO: Needs more testing, especially around corner cases.
Does somebody need more specific callback cancellation reasons?
Consider calling comm_close instead of direct write_data cleanup.
Make connect_timeout documentation in squid.conf less ambiguous.
Move prevalent conn_ debugging to the status() method?
Polish Comm timeout handling to always reset .timeout on callback?
Consider revising eventDelete() to delete between-I/O sleep timeout.
=== modified file 'src/comm/ConnOpener.cc'
--- src/comm/ConnOpener.cc 2013-01-16 10:35:54 +0000
+++ src/comm/ConnOpener.cc 2013-01-25 01:42:09 +0000
@@ -16,343 +16,422 @@
#include "ip/tools.h"
#include "SquidConfig.h"
#include "SquidTime.h"
#if HAVE_ERRNO_H
#include <errno.h>
#endif
class CachePeer;
CBDATA_NAMESPACED_CLASS_INIT(Comm, ConnOpener);
Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) :
AsyncJob("Comm::ConnOpener"),
host_(NULL),
temporaryFd_(-1),
conn_(c),
callback_(handler),
totalTries_(0),
failRetries_(0),
- connectTimeout_(ctimeout),
- connectStart_(0)
+ deadline_(squid_curtime + static_cast<time_t>(ctimeout))
{}
Comm::ConnOpener::~ConnOpener()
{
safe_free(host_);
}
bool
Comm::ConnOpener::doneAll() const
{
// is the conn_ to be opened still waiting?
if (conn_ == NULL) {
return AsyncJob::doneAll();
}
// is the callback still to be called?
if (callback_ == NULL || callback_->canceled()) {
return AsyncJob::doneAll();
}
+ // otherwise, we must be waiting for something
+ Must(temporaryFd_ >= 0 || calls_.sleep_);
return false;
}
void
Comm::ConnOpener::swanSong()
{
- // cancel any event watchers
- // done here to get the "swanSong" mention in cancel debugging.
- if (calls_.earlyAbort_ != NULL) {
- calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
- calls_.earlyAbort_ = NULL;
- }
- if (calls_.timeout_ != NULL) {
- calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
- calls_.timeout_ = NULL;
- }
-
if (callback_ != NULL) {
- if (callback_->canceled())
- callback_ = NULL;
- else
- // inform the still-waiting caller we are dying
- doneConnecting(COMM_ERR_CONNECT, 0);
+ // inform the still-waiting caller we are dying
+ sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::swanSong");
}
- // rollback what we can from the job state
- if (temporaryFd_ >= 0) {
- // doneConnecting() handles partial FD connection cleanup
- doneConnecting(COMM_ERR_CONNECT, 0);
- }
+ if (temporaryFd_ >= 0)
+ closeFd();
+
+ if (calls_.sleep_)
+ cancelSleep();
AsyncJob::swanSong();
}
void
Comm::ConnOpener::setHost(const char * new_host)
{
// unset and erase if already set.
if (host_ != NULL)
safe_free(host_);
// set the new one if given.
if (new_host != NULL)
host_ = xstrdup(new_host);
}
const char *
Comm::ConnOpener::getHost() const
{
return host_;
}
/**
* Connection attempt are completed. One way or the other.
* Pass the results back to the external handler.
- * NP: on errors the earlyAbort call should be cancelled first with a reason.
*/
void
-Comm::ConnOpener::doneConnecting(comm_err_t errFlag, int xerrno)
+Comm::ConnOpener::sendAnswer(comm_err_t errFlag, int xerrno, const char *why)
{
// only mark the address good/bad AFTER connect is finished.
if (host_ != NULL) {
- if (xerrno == 0)
+ if (xerrno == 0) // XXX: should not we use errFlag instead?
ipcacheMarkGoodAddr(host_, conn_->remote);
else {
ipcacheMarkBadAddr(host_, conn_->remote);
#if USE_ICMP
if (Config.onoff.test_reachability)
netdbDeleteAddrNetwork(conn_->remote);
#endif
}
}
if (callback_ != NULL) {
typedef CommConnectCbParams Params;
Params ¶ms = GetCommParams<Params>(callback_);
params.conn = conn_;
params.flag = errFlag;
params.xerrno = xerrno;
ScheduleCallHere(callback_);
callback_ = NULL;
}
- if (temporaryFd_ >= 0) {
- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
- // it never reached fully open, so cleanup the FD handlers
- // Note that comm_close() sequence does not happen for partially open FD
- Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+ // The job will stop without this call because nil callback_ makes
+ // doneAll() true, but this explicit call creates nicer debugging.
+ mustStop(why);
+}
+
+/// cleans up this job I/O state without closing temporaryFd
+/// required before closing temporaryFd or keeping it in conn_
+/// leaves FD bare so must only be called via closeFd() or keepFd()
+void
+Comm::ConnOpener::cleanFd()
+{
+ debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+
+ Must(temporaryFd_ >= 0);
+ fde &f = fd_table[temporaryFd_];
+
+ // XXX: I think we can and should comm_close() instead.
+ if (f.write_handler) {
+
+ /* XXX: We are about to remove write_handler, which was responsible
+ * for deleting write_data, so we have to delete write_data
+ * ourselves. Comm currently calls SetSelect handlers synchronously
+ * so if write_handler is set, we know it has not been called yet.
+ * ConnOpener converts that sync call into an async one, but only
+ * after deleting ptr, so that is not a problem.
+ */
+
+ delete static_cast<Pointer*>(f.write_data);
+ f.write_data = NULL;
+ }
+ // Comm does not do this when calling and resetting write_handler,
+ // so we probably should do this even if write_handler is already NULL.
+ Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+
+ if (calls_.timeout_ != NULL) {
+ calls_.timeout_->cancel("Comm::ConnOpener::cleanFd");
+ calls_.timeout_ = NULL;
+ }
+ // Comm does not clear .timeout when calling/resetting timeoutHandler,
+ // so we probably should do that unconditionally. (XXX: Comm should!)
+ f.timeoutHandler = NULL;
+ f.timeout = 0;
+
+ if (calls_.earlyAbort_ != NULL) {
+ comm_remove_close_handler(temporaryFd_, calls_.earlyAbort_);
calls_.earlyAbort_ = NULL;
- if (calls_.timeout_ != NULL) {
- calls_.timeout_->cancel("Comm::ConnOpener::doneConnecting");
- calls_.timeout_ = NULL;
- }
- fd_table[temporaryFd_].timeoutHandler = NULL;
- fd_table[temporaryFd_].timeout = 0;
- close(temporaryFd_);
- fd_close(temporaryFd_);
- temporaryFd_ = -1;
}
+}
+
+/// cleans I/O state and ends I/O for temporaryFd_
+void
+Comm::ConnOpener::closeFd()
+{
+ if (temporaryFd_ < 0)
+ return;
+
+ cleanFd();
+
+ close(temporaryFd_);
+ fd_close(temporaryFd_);
+ temporaryFd_ = -1;
+}
+
+/// cleans I/O state and moves temporaryFd_ to the conn_ for long-term use
+void
+Comm::ConnOpener::keepFd()
+{
+ Must(conn_ != NULL);
+ Must(temporaryFd_ >= 0);
- /* ensure cleared local state, we are done. */
- conn_ = NULL;
+ cleanFd();
+
+ conn_->fd = temporaryFd_;
+ temporaryFd_ = -1;
}
void
Comm::ConnOpener::start()
{
Must(conn_ != NULL);
- /* get a socket open ready for connecting with */
+ /* outbound sockets have no need to be protocol agnostic. */
+ if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
+ conn_->local.SetIPv4();
+ }
+
+ if (open())
+ connect();
+}
+
+/// called at the end of Comm::ConnOpener::DelayedConnectRetry event
+void
+Comm::ConnOpener::restart() {
+ debugs(5, 5, conn_ << " restarting after sleep");
+ calls_.sleep_ = false;
+
+ if (open())
+ connect();
+}
+
+/// create a socket for the future connection
+bool
+Comm::ConnOpener::open()
+{
+ Must(temporaryFd_ < 0);
+
+ // our initators signal abort by cancelling their callbacks
+ if (callback_ == NULL || callback_->canceled())
+ return false;
+
+ temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
if (temporaryFd_ < 0) {
- /* outbound sockets have no need to be protocol agnostic. */
- if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
- conn_->local.SetIPv4();
- }
- temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
- if (temporaryFd_ < 0) {
- doneConnecting(COMM_ERR_CONNECT, 0);
- return;
- }
+ sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::open");
+ return false;
}
typedef CommCbMemFunT<Comm::ConnOpener, CommCloseCbParams> abortDialer;
calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort);
comm_add_close_handler(temporaryFd_, calls_.earlyAbort_);
typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> timeoutDialer;
calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout);
- debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+ debugs(5, 3, conn_ << " will timeout in " << (deadline_ - squid_curtime));
- // Update the fd_table directly because conn_ is not yet storing the FD
+ // Update the fd_table directly because commSetConnTimeout() needs open conn_
assert(temporaryFd_ < Squid_MaxFD);
assert(fd_table[temporaryFd_].flags.open);
typedef CommTimeoutCbParams Params;
Params ¶ms = GetCommParams<Params>(calls_.timeout_);
params.conn = conn_;
fd_table[temporaryFd_].timeoutHandler = calls_.timeout_;
- fd_table[temporaryFd_].timeout = squid_curtime + (time_t) connectTimeout_;
+ fd_table[temporaryFd_].timeout = deadline_;
- connectStart_ = squid_curtime;
- connect();
+ return true;
}
void
Comm::ConnOpener::connected()
{
- conn_->fd = temporaryFd_;
- temporaryFd_ = -1;
+ Must(temporaryFd_ >= 0);
+ keepFd();
/*
* stats.conn_open is used to account for the number of
* connections that we have open to the CachePeer, so we can limit
* based on the max-conn option. We need to increment here,
* even if the connection may fail.
*/
if (CachePeer *peer=(conn_->getPeer()))
++peer->stats.conn_open;
lookupLocalAddress();
/* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to
* indicate the state of a raw fd object being passed around.
* Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
* when those are done comm_local_port can become one of our member functions to do the below.
*/
- fd_table[conn_->fd].flags.open = 1;
+ Must(fd_table[conn_->fd].flags.open);
fd_table[conn_->fd].local_addr = conn_->local;
+
+ sendAnswer(COMM_OK, 0, "Comm::ConnOpener::connected");
}
-/** Make an FD connection attempt.
- * Handles the case(s) when a partially setup connection gets closed early.
- */
+/// Make an FD connection attempt.
void
Comm::ConnOpener::connect()
{
Must(conn_ != NULL);
-
- // our parent Jobs signal abort by cancelling their callbacks.
- if (callback_ == NULL || callback_->canceled())
- return;
+ Must(temporaryFd_ >= 0);
++ totalTries_;
switch (comm_connect_addr(temporaryFd_, conn_->remote) ) {
case COMM_INPROGRESS:
- // check for timeout FIRST.
- if (squid_curtime - connectStart_ > connectTimeout_) {
- debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
- calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
- doneConnecting(COMM_TIMEOUT, errno);
- return;
- } else {
- debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
- Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
- }
+ debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+ Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
break;
case COMM_OK:
debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
connected();
- doneConnecting(COMM_OK, 0);
break;
- default:
+ default: {
+ const int xerrno = errno;
+
++failRetries_;
+ debugs(5, 7, conn_ << ": failure #" << failRetries_ << " <= " <<
+ Config.connect_retries << ": " << xstrerr(xerrno));
- // check for timeout FIRST.
- if (squid_curtime - connectStart_ > connectTimeout_) {
- debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
- calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
- doneConnecting(COMM_TIMEOUT, errno);
- } else if (failRetries_ < Config.connect_retries) {
+ if (failRetries_ < Config.connect_retries) {
debugs(5, 5, HERE << conn_ << ": * - try again");
- eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, new Pointer(this), 0.05, 0, false);
+ sleep();
return;
} else {
// send ERROR back to the upper layer.
debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
- calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed");
- doneConnecting(COMM_ERR_CONNECT, errno);
+ sendAnswer(COMM_ERR_CONNECT, xerrno, "Comm::ConnOpener::connect");
}
}
+ }
+}
+
+/// Close and wait a little before trying to open and connect again.
+void
+Comm::ConnOpener::sleep() {
+ Must(!calls_.sleep_);
+ closeFd();
+ calls_.sleep_ = true;
+ eventAdd("Comm::ConnOpener::DelayedConnectRetry",
+ Comm::ConnOpener::DelayedConnectRetry,
+ new Pointer(this), 0.05, 0, false);
+}
+
+/// cleans up this job sleep state
+void
+Comm::ConnOpener::cancelSleep()
+{
+ if (calls_.sleep_) {
+ // It would be nice to delete the sleep event, but it might be out of
+ // the event queue and in the async queue already, so (a) we do not know
+ // whether we can safely delete the call ptr here and (b) eventDelete()
+ // will assert if the event went async. TODO: Revise eventDelete() API.
+ // eventDelete(Comm::ConnOpener::DelayedConnectRetry, calls_.sleep);
+ calls_.sleep_ = false;
+ debugs(5, 9, conn_ << " stops sleeping");
+ }
}
/**
* Lookup local-end address and port of the TCP link just opened.
* This ensure the connection local details are set correctly
*/
void
Comm::ConnOpener::lookupLocalAddress()
{
struct addrinfo *addr = NULL;
conn_->local.InitAddrInfo(addr);
if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
conn_->local.FreeAddrInfo(addr);
return;
}
conn_->local = *addr;
conn_->local.FreeAddrInfo(addr);
debugs(5, 6, HERE << conn_);
}
/** Abort connection attempt.
* Handles the case(s) when a partially setup connection gets closed early.
*/
void
Comm::ConnOpener::earlyAbort(const CommCloseCbParams &io)
{
debugs(5, 3, HERE << io.conn);
- doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+ calls_.earlyAbort_ = NULL;
+ // NP: is closing or shutdown better?
+ sendAnswer(COMM_ERR_CLOSING, io.xerrno, "Comm::ConnOpener::earlyAbort");
}
/**
* Handles the case(s) when a partially setup connection gets timed out.
* NP: When commSetConnTimeout accepts generic CommCommonCbParams this can die.
*/
void
Comm::ConnOpener::timeout(const CommTimeoutCbParams &)
{
- connect();
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
+ calls_.timeout_ = NULL;
+ sendAnswer(COMM_TIMEOUT, ETIMEDOUT, "Comm::ConnOpener::timeout");
}
/* Legacy Wrapper for the retry event after COMM_INPROGRESS
* XXX: As soon as Comm::SetSelect() accepts Async calls we can use a ConnOpener::connect call
*/
void
Comm::ConnOpener::InProgressConnectRetry(int fd, void *data)
{
Pointer *ptr = static_cast<Pointer*>(data);
assert(ptr);
if (ConnOpener *cs = ptr->valid()) {
// Ew. we are now outside the all AsyncJob protections.
// get back inside by scheduling another call...
typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
ScheduleCallHere(call);
}
delete ptr;
}
/* Legacy Wrapper for the retry event with small delay after errors.
- * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call
+ * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::restart call
*/
void
Comm::ConnOpener::DelayedConnectRetry(void *data)
{
Pointer *ptr = static_cast<Pointer*>(data);
assert(ptr);
if (ConnOpener *cs = ptr->valid()) {
// Ew. we are now outside the all AsyncJob protections.
// get back inside by scheduling another call...
typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
- AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
+ AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::restart);
ScheduleCallHere(call);
}
delete ptr;
}
=== modified file 'src/comm/ConnOpener.h'
--- src/comm/ConnOpener.h 2013-01-02 23:40:49 +0000
+++ src/comm/ConnOpener.h 2013-01-25 01:43:28 +0000
@@ -23,57 +23,64 @@
public:
void noteAbort() { mustStop("externally aborted"); }
typedef CbcPointer<ConnOpener> Pointer;
virtual bool doneAll() const;
ConnOpener(Comm::ConnectionPointer &, AsyncCall::Pointer &handler, time_t connect_timeout);
~ConnOpener();
void setHost(const char *); ///< set the hostname note for this connection
const char * getHost() const; ///< get the hostname noted for this connection
private:
// Undefined because two openers cannot share a connection
ConnOpener(const ConnOpener &);
ConnOpener & operator =(const ConnOpener &c);
void earlyAbort(const CommCloseCbParams &);
void timeout(const CommTimeoutCbParams &);
- void doneConnecting(comm_err_t errFlag, int xerrno);
+ void sendAnswer(comm_err_t errFlag, int xerrno, const char *why);
static void InProgressConnectRetry(int fd, void *data);
static void DelayedConnectRetry(void *data);
void connect();
void connected();
void lookupLocalAddress();
+ bool open();
+ void sleep();
+ void restart();
+
+ void closeFd();
+ void keepFd();
+
+ void cleanFd();
+ void cancelSleep();
+
private:
char *host_; ///< domain name we are trying to connect to.
int temporaryFd_; ///< the FD being opened. Do NOT set conn_->fd until it is fully open.
Comm::ConnectionPointer conn_; ///< single connection currently to be opened.
AsyncCall::Pointer callback_; ///< handler to be called on connection completion.
int totalTries_; ///< total number of connection attempts over all destinations so far.
int failRetries_; ///< number of retries current destination has been tried.
- /**
- * time at which to abandon the connection.
- * the connection-done callback will be passed COMM_TIMEOUT
- */
- time_t connectTimeout_;
-
- /// time at which this series of connection attempts was started.
- time_t connectStart_;
+ /// if we are not done by then, we will call back with COMM_TIMEOUT
+ time_t deadline_;
/// handles to calls which we may need to cancel.
struct Calls {
AsyncCall::Pointer earlyAbort_;
AsyncCall::Pointer timeout_;
+ /// Whether we are idling before retrying to connect; not yet a call
+ /// [that we can cancel], but it will probably become one eventually.
+ bool sleep_;
} calls_;
CBDATA_CLASS2(ConnOpener);
};
}; // namespace Comm
#endif /* _SQUID_SRC_COMM_CONNOPENER_H */