Hello
I have some new information regarding possible issue in event_active(). I
checked that data corruption is not in question because the function always
uses TryEnterCriticalSection/EnterCriticalSection
However, when I implemented an alternative to event_active(), the
application has been running for more than 10 hours without any
abnormalities. With event_active() the same implementation was never
running more than 1 hours without entering an infinite loop in
event_base_dispatch().
Below is the code snippets related to the change
struct ThreadPool::impl
{
struct ExclusiveData {
struct CompareTasks {
bool operator()(
const shared_ptr<BackgroundTask> &left,
const shared_ptr<BackgroundTask> &right)
{
return left->DueTime > right->DueTime;
}
};
union {
struct sockaddr saddr;
struct sockaddr_storage storage;
} u;
int saddr_len;
SOCKET fd = INVALID_SOCKET;
priority_queue<
shared_ptr<BackgroundTask>,
deque<shared_ptr<BackgroundTask>>,
CompareTasks
> ScheduledTasks;
list<shared_ptr<BackgroundTask>> DueTasks;
list<shared_ptr<BackgroundTask>> CompletedTasks;
bool MustExit = false;
void SendByte()
{
if(fd == INVALID_SOCKET) {
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(fd == INVALID_SOCKET) {
Scope;
Error << "socket cannot be created";
return;
}
if(connect(fd, &u.saddr, saddr_len) != 0) {
Scope;
Error << "connection failed " << WSAGetLastError();
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}
char byte = 1;
if(send(fd, &byte, 1, 0) == SOCKET_ERROR) {
Scope;
Error << "send failed";
closesocket(fd);
fd = INVALID_SOCKET;
return;
}
}
~ExclusiveData()
{
if(fd != INVALID_SOCKET) closesocket(fd);
}
};
list<pthread_t> Threads;
SharedDataContainer<ExclusiveData> Data;
map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
};
class ThreadPoolIPC : GenericContext {
ThreadPoolIPC(GenericServer *s, std::string &p) : GenericContext(s, p) {}
public:
static GenericContext *Create(GenericServer *server, std::string &peer)
{
return new ThreadPoolIPC(server, peer);
}
static void Reader(struct bufferevent *bev, void *user_data)
{
char buf[256];
auto input = bufferevent_get_input(bev);
while(evbuffer_remove(input, buf, sizeof(buf)) > 0) {}
auto ctx = static_cast<ThreadPoolIPC *>(user_data);
ctx->service->pool->EventProc();
}
DataCallback GetReader() const override
{
return Reader;
}
};
void *ThreadPool::ThreadProc(void *user_data)
{
TimedScope(0);
auto ptr = static_cast<impl *>(user_data);
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = ptr->Data.Acquire();
if(task) {
Trace << "Waking up main I/O thread";
data->CompletedTasks.push_back(move(task));
data->SendByte();
task.reset();
}
if(data->MustExit) {
Info << "Exiting the thread";
break;
}
bool due_time_set = false;
MillisecondsClock due_time;
MillisecondsClock now = MillisecondsClock::Now();
while(!data->ScheduledTasks.empty()) {
auto &next = data->ScheduledTasks.top();
if(next->DueTime > now) {
due_time = next->DueTime;
due_time_set = true;
break;
}
data->DueTasks.push_back(next);
data->ScheduledTasks.pop();
}
if(!data->DueTasks.empty()) {
task = move(data->DueTasks.front());
data->DueTasks.pop_front();
} else if(due_time_set) {
long delay = due_time - now;
Trace << "Entering idle state for " << delay << " milliseconds";
data.WaitForSignal(&due_time.ts);
} else {
Trace << "Entering idle state indefinitely";
data.WaitForSignal();
}
}
if(!task) continue;
Trace << "Running task " << task->Name;
task->BackgroundWorker();
Trace << "Task " << task->Name << " has been completed";
}
return 0;
}
void ThreadPool::EventProc()
{
Scope;
shared_ptr<BackgroundTask> task;
while(true) {
{
auto data = ptr->Data.Acquire();
if(task) {
if(task->RestartDelay == 1) {
Trace << "Restarting task " << task->Name << " immediately";
data->DueTasks.push_back(move(task));
data.SendSignal();
} else if(task->RestartDelay != 0) {
Trace << "Rescheduling task " << task->Name << " for "
<< task->RestartDelay << " milliseconds from now";
task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
data->ScheduledTasks.push(move(task));
data.SendSignal();
} else if(task->ConcurrencyForbidden) {
auto &tasks = ptr->TasksByType[task->Type];
tasks.pop_front();
if(!tasks.empty()) {
Trace << task->Type << " left so far " << tasks.size();
data->DueTasks.push_back(move(tasks.front()));
data.SendSignal();
}
}
task.reset();
}
if(data->CompletedTasks.empty()) break;
task = move(data->CompletedTasks.front());
data->CompletedTasks.pop_front();
}
Trace << "Running completion routine for task " << task->Name;
task->OnTaskComplete(service);
}
}
Do you want me to try to implement a compact code (without dependency on
the C++ interface) able to reproduce the problem with event_active() ?
Best regards,
Sten Kultakangas
On Wed, Sep 13, 2017 at 11:15 PM, Sten Kultakangas <[email protected]>
wrote:
> Hello
>
> I have implemented a thread pool for a network service application and ran
> into the following issue. After a certain quite large amount of calls to
> the event_active() function from a thread other than the main I/O thread
> event_base_dispatch() hangs causing high CPU usage. No events are
> dispatched after that.
>
> Infinite loop is caused by the event_base_loop() function which calls
> win32_dispatch() via the following pointer:
>
> res = evsel->dispatch(base, tv_p);
>
> if (res == -1) {
> event_debug(("%s: dispatch returned unsuccessfully.",
> __func__));
> retval = -1;
> goto done;
> }
>
> I traced win32_dispatch() and discovered that at least select() does not
> fail. However, one of the underlying functions called by evmap_io_active_()
> calls SetLastError() with the value 5 (ERROR_ACCESS_DENIED). How can i
> troubleshoot the issue ? Is there any other reliable way to wake up the
> thread running event_base_dispatch() ?
>
> How can i make sure that libevent's internal data is not corrupted during
> a call to event_active() from another thread ? All i know for sure is that
> the calls to event_active() are serialized as well as the access to the
> data structures by both the thread pool and main I/O thread:
>
> void *ThreadPool::ThreadProc(void *user_data)
> {
> TimedScope(0);
> auto ptr = static_cast<impl *>(user_data);
> shared_ptr<BackgroundTask> task;
> while(true) {
> {
> // internally uses mutex to ensure mutually
> // exclusive access to the data contained by 'Data'
> auto data = ptr->Data.Acquire();
>
> if(task) {
> Trace << "Waking up main I/O thread";
> data->CompletedTasks.push_back(move(task));
> event_active(data->Event, EV_READ, 0);
> task.reset();
> }
>
> if(data->MustExit) {
> Info << "Exiting the thread";
> break;
> }
>
> bool due_time_set = false;
> MillisecondsClock due_time;
> MillisecondsClock now = MillisecondsClock::Now();
>
> while(!data->ScheduledTasks.empty()) {
> auto &next = data->ScheduledTasks.top();
> if(next->DueTime > now) {
> due_time = next->DueTime;
> due_time_set = true;
> break;
> }
> data->DueTasks.push_back(next);
> data->ScheduledTasks.pop();
> }
>
> if(!data->DueTasks.empty()) {
> task = move(data->DueTasks.front());
> data->DueTasks.pop_front();
>
> } else if(due_time_set) {
> long delay = due_time - now;
> Trace << "Entering idle state for " << delay << " milliseconds";
> data.WaitForSignal(&due_time.ts);
>
> } else {
> Trace << "Entering idle state indefinitely";
> data.WaitForSignal();
> }
> }
> if(!task) continue;
>
> Trace << "Running task " << task->Name;
> task->BackgroundWorker();
> Trace << "Task " << task->Name << " has been completed";
> }
> return 0;
> }
>
>
>
> The following functions are called in the context of the main I/O thread:
>
> void ThreadPool::EventProc(evutil_socket_t, short, void *user_data)
> {
> Scope;
> auto pool = static_cast<ThreadPool *>(user_data);
> auto ptr = pool->ptr.get();
> shared_ptr<BackgroundTask> task;
> while(true) {
> {
> auto data = ptr->Data.Acquire();
> if(task) {
> if(task->RestartDelay == 1) {
> Trace << "Restarting task " << task->Name << " immediately";
> data->DueTasks.push_back(move(task));
> data.SendSignal();
>
> } else if(task->RestartDelay != 0) {
> Trace << "Rescheduling task " << task->Name << " for "
> << task->RestartDelay << " milliseconds from now";
>
> task->DueTime = MillisecondsClock::Now() + task->RestartDelay;
> data->ScheduledTasks.push(move(task));
> data.SendSignal();
>
> } else if(task->ConcurrencyForbidden) {
> auto &tasks = ptr->TasksByType[task->Type];
> tasks.pop_front();
> if(!tasks.empty()) {
> data->DueTasks.push_back(move(tasks.front()));
> data.SendSignal();
> }
> }
> task.reset();
> }
>
> if(data->CompletedTasks.empty()) break;
> task = move(data->CompletedTasks.front());
> data->CompletedTasks.pop_front();
> }
>
> Trace << "Running completion routine for task " << task->Name;
> task->OnTaskComplete(pool->service);
> }
> }
>
>
> void ThreadPool::RegisterTask(std::shared_ptr<BackgroundTask> task)
> {
> auto &type = typeid(*task);
> task->Type = type.hash_code();
> task->Name = Demangle(type.name());
>
> auto data = ptr->Data.Acquire();
> if(task->ConcurrencyForbidden) {
> // Append the new task to the list of the tasks having the same dynamic
> // type.
>
> auto &tasks = ptr->TasksByType[task->Type];
> if(!tasks.empty()) {
> // If there are other tasks of the same type, we simply append the
> // new task to the list. The new task will be started as soon as the
> // OnTaskComplete() member function of the last task having the same
> // type returns.
> tasks.push_back(move(task));
> return;
> }
>
> // Otherwise the list will only have a single item containing a null
> // reference which will be removed by ThreadPool::EventProc as soon as
> // the OnTaskComplete() member function of the new task returns. It is
> // our responsibility to start the new task immediately.
> tasks.emplace_back();
> }
> data->DueTasks.push_back(move(task));
> data.SendSignal();
> }
>
>
> Internal data structure accessed with mutual exclusion:
>
> struct ThreadPool::impl
> {
> struct ExclusiveData {
> struct CompareTasks {
> bool operator()(
> const shared_ptr<BackgroundTask> &left,
> const shared_ptr<BackgroundTask> &right)
> {
> return left->DueTime > right->DueTime;
> }
> };
>
> priority_queue<
> shared_ptr<BackgroundTask>,
> deque<shared_ptr<BackgroundTask>>,
> CompareTasks
> > ScheduledTasks;
>
> list<shared_ptr<BackgroundTask>> DueTasks;
> list<shared_ptr<BackgroundTask>> CompletedTasks;
> struct event *Event = 0;
> bool MustExit = false;
> };
>
> list<pthread_t> Threads;
> SharedDataContainer<ExclusiveData> Data;
> map<size_t, list<shared_ptr<BackgroundTask>>> TasksByType;
> };
>
>
> I have performed different tests and came to the conclusion that the
> faulty behavior occurs only when real network I/O is used quite
> intensively. For example, the following simple test does not reproduce the
> infinite loop in event_base_dispatch() no matter for how long the test is
> running.
>
>
> struct TestTask : BackgroundTask {
> int number;
>
> TestTask()
> {
> static int seq = 0;
> number = ++seq;
> }
>
> void BackgroundWorker() override
> {
> Scope;
> for(int i = 0; i < 1000; i++) {
> Trace << "blah " << number;
> }
> }
>
> void OnTaskComplete(GenericService *service) override
> {
> RestartDelay = 1;
> }
> };
>
> void TestThreadPool(GenericService *service)
> {
> service->pool->RunTask(new TestTask);
> service->pool->RunTask(new TestTask);
> service->pool->RunTask(new TestTask);
> service->pool->RunTask(new TestTask);
> service->pool->RunTask(new TestTask);
> service->pool->RunTask(new TestTask);
> }
>
>
>
>
> Best regards,
> Sten Kultakangas
>
>