Repository: trafficserver Updated Branches: refs/heads/master 7c42469a7 -> c181e7eea
TS-3797: Crashes due to cross-thread race conditions. This closes #275 Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c181e7ee Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c181e7ee Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c181e7ee Branch: refs/heads/master Commit: c181e7eea93592fa496247118f72e8323846fd5a Parents: 7c42469 Author: shinrich <shinr...@yahoo-inc.com> Authored: Tue Aug 11 14:20:46 2015 -0500 Committer: shinrich <shinr...@yahoo-inc.com> Committed: Fri Aug 21 14:45:12 2015 -0500 ---------------------------------------------------------------------- iocore/net/Connection.cc | 22 +++++- iocore/net/P_Connection.h | 12 ++++ iocore/net/P_SSLNetVConnection.h | 7 ++ iocore/net/P_UnixNetVConnection.h | 12 ++++ iocore/net/SSLNetVConnection.cc | 25 +++++++ iocore/net/UnixNetAccept.cc | 4 +- iocore/net/UnixNetVConnection.cc | 118 +++++++++++++++++++++++++++++---- proxy/http/HttpServerSession.cc | 4 +- proxy/http/HttpServerSession.h | 5 ++ proxy/http/HttpSessionManager.cc | 81 ++++++++++++++-------- proxy/http/HttpTransact.cc | 1 - 11 files changed, 246 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/Connection.cc ---------------------------------------------------------------------- diff --git a/iocore/net/Connection.cc b/iocore/net/Connection.cc index 2a582ab..57532da 100644 --- a/iocore/net/Connection.cc +++ b/iocore/net/Connection.cc @@ -62,7 +62,7 @@ NetVCOptions::toString(addr_bind_style s) return ANY_ADDR == s ? "any" : INTF_ADDR == s ? "interface" : "foreign"; } -Connection::Connection() : fd(NO_FD), is_bound(false), is_connected(false), sock_type(0) +Connection::Connection() : fd(NO_FD), is_bound(false), is_connected(false), is_zombie(false), sock_type(0) { memset(&addr, 0, sizeof(addr)); } @@ -114,16 +114,34 @@ Connection::close() is_connected = false; is_bound = false; // don't close any of the standards - if (fd >= 2) { + if (fd >= 2 && !is_zombie) { int fd_save = fd; fd = NO_FD; return socketManager.close(fd_save); } else { fd = NO_FD; + is_zombie = false; return -EBADF; } } +/** + * Move control of the socket from the argument object orig to the current object. + * Orig is marked as zombie, so when it is freed, the socket will not be closed + */ +void +Connection::move(Connection &orig) +{ + this->is_connected = orig.is_connected; + this->is_bound = orig.is_bound; + this->fd = orig.fd; + this->addr = orig.addr; + this->sock_type = orig.sock_type; + // The original is now the zombie + // The target has taken ownership of the file descriptor + orig.is_zombie = true; +} + static int add_http_filter(int fd ATS_UNUSED) { http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/P_Connection.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_Connection.h b/iocore/net/P_Connection.h index a21c801..4dd2696 100644 --- a/iocore/net/P_Connection.h +++ b/iocore/net/P_Connection.h @@ -83,6 +83,7 @@ struct Connection { IpEndpoint addr; ///< Associated address. bool is_bound; ///< Flag for already bound to a local address. bool is_connected; ///< Flag for already connected. + bool is_zombie; ///< Flag true if the fd should not be closed int sock_type; /** Create and initialize the socket for this connection. @@ -139,6 +140,17 @@ struct Connection { /// Default options. static NetVCOptions const DEFAULT_OPTIONS; + /** + * Move control of the socket from the argument object orig to the current object. + * Orig is marked as zombie, so when it is freed, the socket will not be closed + */ + void move(Connection &); + +private: + // Don't want copy constructors to avoid having the deconstructor on + // temporarly copies close the file descriptor too soon. Use move instead + Connection(Connection const &); + protected: void _cleanup(); }; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/P_SSLNetVConnection.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_SSLNetVConnection.h b/iocore/net/P_SSLNetVConnection.h index 853f097..59d492c 100644 --- a/iocore/net/P_SSLNetVConnection.h +++ b/iocore/net/P_SSLNetVConnection.h @@ -283,6 +283,13 @@ public: return SSL_get_cipher_name(ssl); } + /** + * Populate the current object based on the socket information in in the + * con parameter and the ssl object in the arg parameter + * This is logic is invoked when the NetVC object is created in a new thread context + */ + virtual int populate(Connection &con, Continuation *c, void *arg); + private: SSLNetVConnection(const SSLNetVConnection &); SSLNetVConnection &operator=(const SSLNetVConnection &); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/P_UnixNetVConnection.h ---------------------------------------------------------------------- diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index 92bfeac..968b0b9 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -216,6 +216,12 @@ public: void readReschedule(NetHandler *nh); void writeReschedule(NetHandler *nh); void netActivity(EThread *lthread); + /** + * If the current object's thread does not match the t argument, create a new + * NetVC in the thread t context based on the socket and ssl information in the + * current NetVC and mark the current NetVC to be closed. + */ + UnixNetVConnection *migrateToCurrentThread(Continuation *c, EThread *t); Action action_; volatile int closed; @@ -271,6 +277,12 @@ public: int acceptEvent(int event, Event *e); int mainEvent(int event, Event *e); virtual int connectUp(EThread *t, int fd); + /** + * Populate the current object based on the socket information in in the + * con parameter. + * This is logic is invoked when the NetVC object is created in a new thread context + */ + virtual int populate(Connection &con, Continuation *c, void *arg); virtual void free(EThread *t); virtual ink_hrtime get_inactivity_timeout(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/SSLNetVConnection.cc ---------------------------------------------------------------------- diff --git a/iocore/net/SSLNetVConnection.cc b/iocore/net/SSLNetVConnection.cc index 5e2b0ef..5ed072f 100644 --- a/iocore/net/SSLNetVConnection.cc +++ b/iocore/net/SSLNetVConnection.cc @@ -423,6 +423,13 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread) readReschedule(nh); return; } + // Got closed by the HttpSessionManager thread during a migration + // The closed flag should be stable once we get the s->vio.mutex in that case + // (the global session pool mutex). + if (this->closed) { + this->super::net_read_io(nh, lthread); + return; + } // If it is not enabled, lower its priority. This allows // a fast connection to speed match a slower connection by // shifting down in priority even if it could read. @@ -898,6 +905,7 @@ SSLNetVConnection::free(EThread *t) write.vio.vc_server = NULL; options.reset(); closed = 0; + con.close(); ink_assert(con.fd == NO_FD); if (ssl != NULL) { SSL_free(ssl); @@ -932,6 +940,7 @@ SSLNetVConnection::free(EThread *t) if (from_accept_thread) { sslNetVCAllocator.free(this); } else { + ink_assert(con.fd == NO_FD); THREAD_FREE(this, sslNetVCAllocator, t); } } @@ -1512,3 +1521,19 @@ SSLNetVConnection::computeSSLTrace() return trace; } + +int +SSLNetVConnection::populate(Connection &con, Continuation *c, void *arg) +{ + int retval = super::populate(con, c, arg); + if (retval != EVENT_DONE) + return retval; + // Add in the SSL data + this->ssl = (SSL *)arg; + // Maybe bring over the stats? + + this->sslHandShakeComplete = true; + this->sslClientConnection = true; + SSL_set_ex_data(this->ssl, get_ssl_client_data_index(), this); + return EVENT_DONE; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/UnixNetAccept.cc ---------------------------------------------------------------------- diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc index 20e9e9a..37ee6de 100644 --- a/iocore/net/UnixNetAccept.cc +++ b/iocore/net/UnixNetAccept.cc @@ -451,6 +451,7 @@ NetAccept::acceptFastEvent(int event, void *ep) return EVENT_DONE; } + ink_assert(vc->nh->mutex->thread_holding == this_ethread()); vc->nh->open_list.enqueue(vc); #ifdef USE_EDGE_TRIGGER @@ -464,8 +465,9 @@ NetAccept::acceptFastEvent(int event, void *ep) // We must be holding the lock already to do later do_io_read's SCOPED_MUTEX_LOCK(lock, vc->mutex, e->ethread); action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc); - } else + } else { close_UnixNetVConnection(vc, e->ethread); + } } while (loop); Ldone: http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/iocore/net/UnixNetVConnection.cc ---------------------------------------------------------------------- diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index f32a25f..12ba4a5 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -101,6 +101,9 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t) vc->cancel_OOB(); vc->ep.stop(); vc->con.close(); + + ink_release_assert(vc->thread == t); + #ifdef INACTIVITY_TIMEOUT if (vc->inactivity_timeout) { vc->inactivity_timeout->cancel_action(vc); @@ -117,20 +120,22 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t) vc->inactivity_timeout_in = 0; vc->active_timeout_in = 0; - nh->open_list.remove(vc); - nh->cop_list.remove(vc); - nh->read_ready_list.remove(vc); - nh->write_ready_list.remove(vc); - if (vc->read.in_enabled_list) { - nh->read_enable_list.remove(vc); - vc->read.in_enabled_list = 0; - } - if (vc->write.in_enabled_list) { - nh->write_enable_list.remove(vc); - vc->write.in_enabled_list = 0; + if (nh) { + nh->open_list.remove(vc); + nh->cop_list.remove(vc); + nh->read_ready_list.remove(vc); + nh->write_ready_list.remove(vc); + if (vc->read.in_enabled_list) { + nh->read_enable_list.remove(vc); + vc->read.in_enabled_list = 0; + } + if (vc->write.in_enabled_list) { + nh->write_enable_list.remove(vc); + vc->write.in_enabled_list = 0; + } + vc->remove_from_keep_alive_queue(); + vc->remove_from_active_queue(); } - vc->remove_from_keep_alive_queue(); - vc->remove_from_active_queue(); vc->free(t); } @@ -254,6 +259,14 @@ read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) read_reschedule(nh, vc); return; } + + // It is possible that the closed flag got set from HttpSessionManager in the + // global session pool case. If so, the closed flag should be stable once we get the + // s->vio.mutex (the global session pool mutex). + if (vc->closed) { + close_UnixNetVConnection(vc, thread); + return; + } // if it is not enabled. if (!s->enabled || s->vio.op != VIO::READ) { read_disable(nh, vc); @@ -672,7 +685,7 @@ UnixNetVConnection::do_io_close(int alerrno /* = -1 */) write.vio._cont = NULL; EThread *t = this_ethread(); - bool close_inline = !recursion && nh->mutex->thread_holding == t; + bool close_inline = !recursion && (!nh || nh->mutex->thread_holding == t); INK_WRITE_MEMORY_BARRIER; if (alerrno && alerrno != -1) @@ -1183,6 +1196,34 @@ UnixNetVConnection::mainEvent(int event, Event *e) return EVENT_DONE; } +int +UnixNetVConnection::populate(Connection &con_in, Continuation *c, void *arg) +{ + this->con.move(con_in); + this->mutex = c->mutex; + this->thread = this_ethread(); + + EThread *t = this_ethread(); + if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ | EVENTIO_WRITE) < 0) { + Debug("iocore_net", "populate : Failed to add to epoll list\n"); + return EVENT_ERROR; + } + + SET_HANDLER(&UnixNetVConnection::mainEvent); + + this->nh = get_NetHandler(t); + ink_assert(this->nh != NULL); + MUTEX_TRY_LOCK(lock, this->nh->mutex, t); + if (!lock.is_locked()) { + // Clean up and go home + return EVENT_ERROR; + } + ink_assert(nh->mutex->thread_holding == this_ethread()); + ink_assert(!nh->open_list.in(this)); + this->nh->open_list.enqueue(this); + ink_assert(this->con.fd != NO_FD); + return EVENT_DONE; +} int UnixNetVConnection::connectUp(EThread *t, int fd) @@ -1273,6 +1314,7 @@ fail: void UnixNetVConnection::free(EThread *t) { + ink_release_assert(t == this_ethread()); NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1); // clear variables for reuse this->mutex.clear(); @@ -1318,3 +1360,51 @@ UnixNetVConnection::apply_options() { con.apply_options(options); } + +/* + * Close down the current netVC. Save aside the socket and SSL information + * and create new netVC in the current thread/netVC + */ +UnixNetVConnection * +UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t) +{ + NetHandler *client_nh = get_NetHandler(t); + ink_assert(client_nh); + if (this->nh == client_nh) { + // We're already there! + return this; + } + Connection hold_con; + hold_con.move(this->con); + SSLNetVConnection *sslvc = dynamic_cast<SSLNetVConnection *>(this); + SSL *save_ssl = (sslvc) ? sslvc->ssl : NULL; + if (save_ssl) { + SSL_set_ex_data(sslvc->ssl, get_ssl_client_data_index(), NULL); + sslvc->ssl = NULL; + } + + // Do_io_close will signal the VC to be freed on the original thread + // Since we moved the con context, the fd will not be closed + this->do_io_close(); + + // Create new VC: + NetVConnection *new_vc = NULL; + if (save_ssl) { + new_vc = sslNetProcessor.allocate_vc(t); + SSLNetVConnection *sslvc = dynamic_cast<SSLNetVConnection *>(new_vc); + if (sslvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) { + sslvc->do_io_close(); + sslvc = NULL; + } + return sslvc; + // Update the SSL fields + } else { + new_vc = netProcessor.allocate_vc(t); + UnixNetVConnection *netvc = dynamic_cast<UnixNetVConnection *>(new_vc); + if (netvc->populate(hold_con, cont, save_ssl) != EVENT_DONE) { + netvc->do_io_close(); + netvc = NULL; + } + return netvc; + } +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/proxy/http/HttpServerSession.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpServerSession.cc b/proxy/http/HttpServerSession.cc index 8e5ce73..1e5405f 100644 --- a/proxy/http/HttpServerSession.cc +++ b/proxy/http/HttpServerSession.cc @@ -120,7 +120,9 @@ HttpServerSession::do_io_close(int alerrno) Debug("http_ss", "[%" PRId64 "] session closing, netvc %p", con_id, server_vc); - server_vc->do_io_close(alerrno); + if (server_vc) { + server_vc->do_io_close(alerrno); + } server_vc = NULL; HTTP_SUM_GLOBAL_DYN_STAT(http_current_server_connections_stat, -1); // Make sure to work on the global stat http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/proxy/http/HttpServerSession.h ---------------------------------------------------------------------- diff --git a/proxy/http/HttpServerSession.h b/proxy/http/HttpServerSession.h index 61ded32..3ec5f0c 100644 --- a/proxy/http/HttpServerSession.h +++ b/proxy/http/HttpServerSession.h @@ -112,6 +112,11 @@ public: { return server_vc; }; + void + set_netvc(NetVConnection *new_vc) + { + server_vc = new_vc; + } // Keys for matching hostnames IpEndpoint server_ip; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/proxy/http/HttpSessionManager.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpSessionManager.cc b/proxy/http/HttpSessionManager.cc index 5feb9c4..2dc1e43 100644 --- a/proxy/http/HttpSessionManager.cc +++ b/proxy/http/HttpSessionManager.cc @@ -263,33 +263,62 @@ HttpSessionManager::acquire_session(Continuation * /* cont ATS_UNUSED */, sockad to_return = NULL; } - // Now check to see if we have a connection in our shared connection pool - EThread *ethread = this_ethread(); - - ProxyMutex *pool_mutex = (TS_SERVER_SESSION_SHARING_POOL_THREAD == sm->t_state.http_config_param->server_session_sharing_pool) ? - ethread->server_session_pool->mutex : - m_g_pool->mutex; - MUTEX_TRY_LOCK(lock, pool_mutex, ethread); - if (lock.is_locked()) { - if (TS_SERVER_SESSION_SHARING_POOL_THREAD == sm->t_state.http_config_param->server_session_sharing_pool) { - retval = ethread->server_session_pool->acquireSession(ip, hostname_hash, match_style, to_return); - Debug("http_ss", "[acquire session] thread pool search %s", to_return ? "successful" : "failed"); - } else { - retval = m_g_pool->acquireSession(ip, hostname_hash, match_style, to_return); - Debug("http_ss", "[acquire session] global pool search %s", to_return ? "successful" : "failed"); - } - if (to_return) { - Debug("http_ss", "[%" PRId64 "] [acquire session] return session from shared pool", to_return->con_id); - to_return->state = HSS_ACTIVE; - // Holding the pool lock and the sm lock - // the attach_server_session will issue the do_io_read under the sm lock - // Must be careful to transfer the lock for the read vio because - // the server VC may be moving between threads TS-3266 - sm->attach_server_session(to_return); - retval = HSM_DONE; + // TS-3797 Adding another scope so the pool lock is dropped after it is removed from the pool and + // potentially moved to the current thread. At the end of this scope, either the original + // pool selected VC is on the current thread or its content has been moved to a new VC on the + // current thread and the original has been deleted. This should adequately cover TS-3266 so we + // don't have to continue to hold the pool thread while we initialize the server session in the + // client session + { + // Now check to see if we have a connection in our shared connection pool + EThread *ethread = this_ethread(); + ProxyMutex *pool_mutex = (TS_SERVER_SESSION_SHARING_POOL_THREAD == sm->t_state.http_config_param->server_session_sharing_pool) ? + ethread->server_session_pool->mutex : + m_g_pool->mutex; + MUTEX_TRY_LOCK(lock, pool_mutex, ethread); + if (lock.is_locked()) { + if (TS_SERVER_SESSION_SHARING_POOL_THREAD == sm->t_state.http_config_param->server_session_sharing_pool) { + retval = ethread->server_session_pool->acquireSession(ip, hostname_hash, match_style, to_return); + Debug("http_ss", "[acquire session] thread pool search %s", to_return ? "successful" : "failed"); + } else { + retval = m_g_pool->acquireSession(ip, hostname_hash, match_style, to_return); + Debug("http_ss", "[acquire session] global pool search %s", to_return ? "successful" : "failed"); + // At this point to_return has been removed from the pool. Do we need to move it + // to the same thread? + if (to_return) { + UnixNetVConnection *server_vc = dynamic_cast<UnixNetVConnection *>(to_return->get_netvc()); + if (server_vc) { + UnixNetVConnection *new_vc = server_vc->migrateToCurrentThread(sm, ethread); + // The VC moved, free up the original one + if (new_vc != server_vc) { + ink_assert(new_vc == NULL || new_vc->nh != NULL); + to_return->set_netvc(new_vc); + if (!new_vc) { + // Close out to_return, we were't able to get a connection + to_return->do_io_close(); + to_return = NULL; + } else { + // Keep things from timing out on us + new_vc->set_inactivity_timeout(new_vc->get_inactivity_timeout()); + } + } else { + // Keep things from timing out on us + server_vc->set_inactivity_timeout(server_vc->get_inactivity_timeout()); + } + } + } + } + } else { // Didn't get the lock. to_return is still NULL + retval = HSM_RETRY; } - } else { - retval = HSM_RETRY; + } + + if (to_return) { + Debug("http_ss", "[%" PRId64 "] [acquire session] return session from shared pool", to_return->con_id); + to_return->state = HSS_ACTIVE; + // the attach_server_session will issue the do_io_read under the sm lock + sm->attach_server_session(to_return); + retval = HSM_DONE; } return retval; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c181e7ee/proxy/http/HttpTransact.cc ---------------------------------------------------------------------- diff --git a/proxy/http/HttpTransact.cc b/proxy/http/HttpTransact.cc index a4d492e..15a3cd6 100644 --- a/proxy/http/HttpTransact.cc +++ b/proxy/http/HttpTransact.cc @@ -3826,7 +3826,6 @@ HttpTransact::handle_server_connection_not_open(State *s) DebugTxn("http_trans", "[handle_server_connection_not_open] (hscno)"); DebugTxn("http_seq", "[HttpTransact::handle_server_connection_not_open] "); ink_assert(s->current.state != CONNECTION_ALIVE); - ink_assert(s->current.server->had_connect_fail()); SET_VIA_STRING(VIA_SERVER_RESULT, VIA_SERVER_ERROR); HTTP_INCREMENT_TRANS_STAT(http_broken_server_connections_stat);