This is an automated email from the ASF dual-hosted git repository. zwoop pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit d15c45484e15ec7c141c6af66116c57dd2ce62e2 Author: Fei Deng <duke8...@gmail.com> AuthorDate: Wed Oct 30 15:55:15 2019 -0500 cleanup the eventloop (cherry picked from commit c179620382dc37eb33f39357f458175800756f14) Conflicts: iocore/net/UnixNetAccept.cc --- iocore/aio/AIO.cc | 4 ++-- iocore/cache/CacheWrite.cc | 6 +++--- iocore/dns/DNS.cc | 2 +- iocore/eventsystem/I_EThread.h | 6 +----- iocore/eventsystem/I_EventProcessor.h | 8 ++------ iocore/eventsystem/I_ProtectedQueue.h | 4 +--- iocore/eventsystem/P_UnixEThread.h | 13 ++----------- iocore/eventsystem/P_UnixEventProcessor.h | 18 ++---------------- iocore/eventsystem/ProtectedQueue.cc | 21 +-------------------- iocore/eventsystem/UnixEThread.cc | 16 +--------------- iocore/net/UnixNetAccept.cc | 2 +- 11 files changed, 17 insertions(+), 83 deletions(-) diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc index 5543919..df8e27b 100644 --- a/iocore/aio/AIO.cc +++ b/iocore/aio/AIO.cc @@ -480,9 +480,9 @@ aio_thread_main(void *arg) SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding); op->handleEvent(EVENT_NONE, nullptr); } else if (op->thread == AIO_CALLBACK_THREAD_ANY) { - eventProcessor.schedule_imm_signal(op); + eventProcessor.schedule_imm(op); } else { - op->thread->schedule_imm_signal(op); + op->thread->schedule_imm(op); } ink_mutex_acquire(&my_aio_req->aio_mutex); } while (true); diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index 944d527..cff1818 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -360,7 +360,7 @@ Vol::aggWriteDone(int event, Event *e) CacheVC *c = nullptr; while ((c = sync.dequeue())) { if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) { - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } else { sync.push(c); // put it back on the front break; @@ -1028,7 +1028,7 @@ Lagain: ink_assert(false); while ((c = agg.dequeue())) { agg_todo_size -= c->agg_len; - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } @@ -1092,7 +1092,7 @@ Lwait: if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) { ret = EVENT_RETURN; } else { - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } } return ret; diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc index 6713003..d33c4dc 100644 --- a/iocore/dns/DNS.cc +++ b/iocore/dns/DNS.cc @@ -1437,7 +1437,7 @@ DNSEntry::post(DNSHandler *h, HostEnt *ent) } else { mutex = action.mutex; SET_HANDLER(&DNSEntry::postOneEvent); - submit_thread->schedule_imm_signal(this); + submit_thread->schedule_imm(this); } return 0; } diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h index c70fb04..45afc73 100644 --- a/iocore/eventsystem/I_EThread.h +++ b/iocore/eventsystem/I_EThread.h @@ -126,7 +126,6 @@ public: */ Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr); - Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr); /** Schedules the continuation on this EThread to receive an event @@ -300,7 +299,7 @@ public: EThread &operator=(const EThread &) = delete; ~EThread() override; - Event *schedule(Event *e, bool fast_signal = false); + Event *schedule(Event *e); /** Block of memory to allocate thread specific data e.g. stat system arrays. */ char thread_private[PER_THREAD_DATA]; @@ -314,9 +313,6 @@ public: ProtectedQueue EventQueueExternal; PriorityEventQueue EventQueue; - EThread **ethreads_to_be_signalled = nullptr; - int n_ethreads_to_be_signalled = 0; - static constexpr int NO_ETHREAD_ID = -1; int id = NO_ETHREAD_ID; unsigned int event_types = 0; diff --git a/iocore/eventsystem/I_EventProcessor.h b/iocore/eventsystem/I_EventProcessor.h index 227ea19..782137a 100644 --- a/iocore/eventsystem/I_EventProcessor.h +++ b/iocore/eventsystem/I_EventProcessor.h @@ -151,11 +151,7 @@ public: */ Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr); - /* - provides the same functionality as schedule_imm and also signals the thread immediately - */ - Event *schedule_imm_signal(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE, - void *cookie = nullptr); + /** Schedules the continuation on a specific thread group to receive an event at the given timeout. Requests the EventProcessor to schedule @@ -331,7 +327,7 @@ public: | Unix & non NT Interface | \*------------------------------------------------------*/ - Event *schedule(Event *e, EventType etype, bool fast_signal = false); + Event *schedule(Event *e, EventType etype); EThread *assign_thread(EventType etype); EThread *assign_affinity_by_type(Continuation *cont, EventType etype); diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h index 7742fac..8bd0eeb 100644 --- a/iocore/eventsystem/I_ProtectedQueue.h +++ b/iocore/eventsystem/I_ProtectedQueue.h @@ -37,7 +37,7 @@ #include "tscore/ink_platform.h" #include "I_Event.h" struct ProtectedQueue { - void enqueue(Event *e, bool fast_signal = false); + void enqueue(Event *e); void signal(); int try_signal(); // Use non blocking lock and if acquired, signal void enqueue_local(Event *e); // Safe when called from the same thread @@ -54,5 +54,3 @@ struct ProtectedQueue { ProtectedQueue(); }; - -void flush_signals(EThread *t); diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h index b65e3a7..55f093a 100644 --- a/iocore/eventsystem/P_UnixEThread.h +++ b/iocore/eventsystem/P_UnixEThread.h @@ -45,15 +45,6 @@ EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie) } TS_INLINE Event * -EThread::schedule_imm_signal(Continuation *cont, int callback_event, void *cookie) -{ - Event *e = ::eventAllocator.alloc(); - e->callback_event = callback_event; - e->cookie = cookie; - return schedule(e->init(cont, 0, 0), true); -} - -TS_INLINE Event * EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie) { Event *e = ::eventAllocator.alloc(); @@ -85,7 +76,7 @@ EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, vo } TS_INLINE Event * -EThread::schedule(Event *e, bool fast_signal) +EThread::schedule(Event *e) { e->ethread = this; ink_assert(tt == REGULAR); @@ -100,7 +91,7 @@ EThread::schedule(Event *e, bool fast_signal) // The continuation that gets scheduled later is not always the // client VC, it can be HttpCacheSM etc. so save the flags e->continuation->control_flags.set_flags(get_cont_flags().get_flags()); - EventQueueExternal.enqueue(e, fast_signal); + EventQueueExternal.enqueue(e); return e; } diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h index 6e89d5b..fe89945 100644 --- a/iocore/eventsystem/P_UnixEventProcessor.h +++ b/iocore/eventsystem/P_UnixEventProcessor.h @@ -89,7 +89,7 @@ EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype) } TS_INLINE Event * -EventProcessor::schedule(Event *e, EventType etype, bool fast_signal) +EventProcessor::schedule(Event *e, EventType etype) { ink_assert(etype < MAX_EVENT_TYPES); @@ -116,25 +116,11 @@ EventProcessor::schedule(Event *e, EventType etype, bool fast_signal) if (e->continuation->mutex) { e->mutex = e->continuation->mutex; } - e->ethread->EventQueueExternal.enqueue(e, fast_signal); + e->ethread->EventQueueExternal.enqueue(e); return e; } TS_INLINE Event * -EventProcessor::schedule_imm_signal(Continuation *cont, EventType et, int callback_event, void *cookie) -{ - Event *e = eventAllocator.alloc(); - - ink_assert(et < MAX_EVENT_TYPES); -#ifdef ENABLE_TIME_TRACE - e->start_time = Thread::get_hrtime(); -#endif - e->callback_event = callback_event; - e->cookie = cookie; - return schedule(e->init(cont, 0, 0), et, true); -} - -TS_INLINE Event * EventProcessor::schedule_imm(Continuation *cont, EventType et, int callback_event, void *cookie) { Event *e = eventAllocator.alloc(); diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc index e8741c7..d8a14da 100644 --- a/iocore/eventsystem/ProtectedQueue.cc +++ b/iocore/eventsystem/ProtectedQueue.cc @@ -44,7 +44,7 @@ extern ClassAllocator<Event> eventAllocator; void -ProtectedQueue::enqueue(Event *e, bool fast_signal) +ProtectedQueue::enqueue(Event *e) { ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue); EThread *e_ethread = e->ethread; @@ -62,25 +62,6 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal) } void -flush_signals(EThread *thr) -{ - ink_assert(this_ethread() == thr); - int n = thr->n_ethreads_to_be_signalled; - if (n > eventProcessor.n_ethreads) { - n = eventProcessor.n_ethreads; // MAX - } - int i; - - for (i = 0; i < n; i++) { - if (thr->ethreads_to_be_signalled[i]) { - thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity(); - thr->ethreads_to_be_signalled[i] = nullptr; - } - } - thr->n_ethreads_to_be_signalled = 0; -} - -void ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep) { (void)cur_time; diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc index df44c71..06c9943 100644 --- a/iocore/eventsystem/UnixEThread.cc +++ b/iocore/eventsystem/UnixEThread.cc @@ -58,8 +58,6 @@ EThread::EThread() EThread::EThread(ThreadType att, int anid) : id(anid), tt(att) { - ethreads_to_be_signalled = static_cast<EThread **>(ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *))); - memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *)); memset(thread_private, 0, PER_THREAD_DATA); #if HAVE_EVENTFD evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); @@ -94,15 +92,7 @@ EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e) // Provide a destructor so that SDK functions which create and destroy // threads won't have to deal with EThread memory deallocation. -EThread::~EThread() -{ - if (n_ethreads_to_be_signalled > 0) { - flush_signals(this); - } - ats_free(ethreads_to_be_signalled); - // TODO: This can't be deleted .... - // delete[]l1_hash; -} +EThread::~EThread() {} bool EThread::is_event_type(EventType et) @@ -273,10 +263,6 @@ EThread::execute_regular() sleep_time = 0; } - if (n_ethreads_to_be_signalled) { - flush_signals(this); - } - tail_cb->waitForActivity(sleep_time); // loop cleanup diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc index 7109a27..89c1c96 100644 --- a/iocore/net/UnixNetAccept.cc +++ b/iocore/net/UnixNetAccept.cc @@ -360,7 +360,7 @@ NetAccept::do_blocking_accept(EThread *t) NetHandler *h = get_NetHandler(localt); // Assign NetHandler->mutex to NetVC vc->mutex = h->mutex; - localt->schedule_imm_signal(vc); + localt->schedule_imm(vc); } while (loop); return 1;