Author: gonzalo
Date: 2005-04-12 17:42:15 -0400 (Tue, 12 Apr 2005)
New Revision: 42880
Modified:
trunk/mono/mono/metadata/ChangeLog
trunk/mono/mono/metadata/threadpool.c
Log:
2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>
* threadpool.c: added epoll() based implementation for asynchronous IO
that is used instead of the default poll() when available.
It can be disabled by setting MONO_DISABLE_AIO.
Modified: trunk/mono/mono/metadata/ChangeLog
===================================================================
--- trunk/mono/mono/metadata/ChangeLog 2005-04-12 21:03:02 UTC (rev 42879)
+++ trunk/mono/mono/metadata/ChangeLog 2005-04-12 21:42:15 UTC (rev 42880)
@@ -1,5 +1,11 @@
2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>
+ * threadpool.c: added epoll() based implementation for asynchronous IO
+ that is used instead of the default poll() when available.
+ It can be disabled by setting MONO_DISABLE_AIO.
+
+2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>
+
* threadpool.c: windows needs 'closesocket' and instead of returning
0 when the stream is closed while in select, it returns -1. Fixes bug
#74573.
Modified: trunk/mono/mono/metadata/threadpool.c
===================================================================
--- trunk/mono/mono/metadata/threadpool.c 2005-04-12 21:03:02 UTC (rev
42879)
+++ trunk/mono/mono/metadata/threadpool.c 2005-04-12 21:42:15 UTC (rev
42880)
@@ -39,11 +39,15 @@
#include <string.h>
#include <mono/utils/mono-poll.h>
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
#include "mono/io-layer/socket-wrappers.h"
#include "threadpool.h"
+#undef EPOLL_DEBUG
/* maximum number of worker threads */
static int mono_max_worker_threads = THREADS_PER_CPU;
static int mono_min_worker_threads = 0;
@@ -75,6 +79,10 @@
HANDLE new_sem; /* access to newpfd and write side of the pipe */
mono_pollfd *newpfd;
+ gboolean epoll_disabled;
+#ifdef HAVE_EPOLL
+ int epollfd;
+#endif
} SocketIOData;
static SocketIOData socket_io_data;
@@ -133,7 +141,8 @@
#endif
data->pipe [0] = -1;
data->pipe [1] = -1;
- CloseHandle (data->new_sem);
+ if (data->new_sem)
+ CloseHandle (data->new_sem);
data->new_sem = NULL;
g_hash_table_destroy (data->sock_to_state);
data->sock_to_state = NULL;
@@ -141,6 +150,10 @@
async_io_queue = NULL;
g_free (data->newpfd);
data->newpfd = NULL;
+#ifdef HAVE_EPOLL
+ if (FALSE == data->epoll_disabled)
+ close (data->epollfd);
+#endif
LeaveCriticalSection (&data->io_lock);
}
@@ -157,7 +170,7 @@
case AIO_OP_CONNECT:
return MONO_POLLOUT;
default: /* Should never happen */
- g_print ("socket_io_add: unknown value in switch!!!\n");
+ g_print ("get_event_from_state: unknown value in switch!!!\n");
return 0;
}
}
@@ -275,12 +288,16 @@
state = (MonoSocketAsyncResult *) list->data;
if (get_event_from_state (state) == event)
break;
+
list = list->next;
}
if (list != NULL) {
oldlist = g_slist_remove_link (oldlist, list);
g_slist_free_1 (list);
+#ifdef EPOLL_DEBUG
+ g_print ("Dispatching event %d on socket %d\n", event,
state->handle);
+#endif
InterlockedIncrement (&pending_io_items);
start_io_thread_or_queue (state->ares);
}
@@ -313,7 +330,7 @@
}
static void
-socket_io_main (gpointer p)
+socket_io_poll_main (gpointer p)
{
#define INITIAL_POLLFD_SIZE 1024
#define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
@@ -324,7 +341,6 @@
gint i;
MonoThread *thread;
-
thread = mono_thread_current ();
thread->threadpool_thread = TRUE;
thread->state |= ThreadState_Background;
@@ -452,6 +468,110 @@
}
}
+#ifdef HAVE_EPOLL
+#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
+static void
+socket_io_epoll_main (gpointer p)
+{
+ SocketIOData *data;
+ int epollfd;
+ MonoThread *thread;
+ struct epoll_event *events, *evt;
+ const int nevents = 512;
+ int ready = 0, i;
+
+ data = p;
+ epollfd = data->epollfd;
+ thread = mono_thread_current ();
+ thread->threadpool_thread = TRUE;
+ thread->state |= ThreadState_Background;
+ events = g_new0 (struct epoll_event, nevents);
+
+ while (1) {
+ do {
+ if (ready == -1) {
+ if ((thread->state & ThreadState_StopRequested)
!= 0) {
+ g_free (events);
+ close (epollfd);
+ mono_thread_interruption_checkpoint ();
+ g_assert_not_reached ();
+ }
+ }
+#ifdef EPOLL_DEBUG
+ g_print ("epoll_wait init\n");
+#endif
+ ready = epoll_wait (epollfd, events, nevents, -1);
+#ifdef EPOLL_DEBUG
+ {
+ int err = errno;
+ g_print ("epoll_wait end with %d ready sockets.\n",
ready);
+ errno = err;
+ }
+#endif
+ } while (ready == -1 && errno == EINTR);
+
+ if (ready == -1) {
+ int err = errno;
+ g_warning ("epoll_wait: %d %s\n", err, g_strerror
(err));
+ g_free (events);
+ close (epollfd);
+ return;
+ }
+
+ EnterCriticalSection (&data->io_lock);
+ if (data->inited == 0) {
+#ifdef EPOLL_DEBUG
+ g_print ("data->inited == 0\n");
+#endif
+ g_free (events);
+ close (epollfd);
+ return; /* cleanup called */
+ }
+
+ for (i = 0; i < ready; i++) {
+ int fd;
+ GSList *list;
+
+ evt = &events [i];
+ fd = evt->data.fd;
+ list = g_hash_table_lookup (data->sock_to_state,
GINT_TO_POINTER (fd));
+#ifdef EPOLL_DEBUG
+ g_print ("Event %d on %d list length: %d\n",
evt->events, fd, g_slist_length (list));
+#endif
+ if (list != NULL && (evt->events & (EPOLLIN |
EPOLL_ERRORS)) != 0) {
+ list = process_io_event (list, MONO_POLLIN);
+ }
+
+ if (list != NULL && (evt->events & (EPOLLOUT |
EPOLL_ERRORS)) != 0) {
+ list = process_io_event (list, MONO_POLLOUT);
+ }
+
+ if (list != NULL) {
+ g_hash_table_replace (data->sock_to_state,
GINT_TO_POINTER (fd), list);
+ evt->events = get_events_from_list (list);
+#ifdef EPOLL_DEBUG
+ g_print ("MOD %d to %d\n", fd, evt->events);
+#endif
+ if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd,
evt)) {
+ int err = errno;
+ g_message ("epoll_ctl(MOD): %d %s",
err, g_strerror (err));
+ }
+ } else {
+ g_hash_table_remove (data->sock_to_state,
GINT_TO_POINTER (fd));
+#ifdef EPOLL_DEBUG
+ g_print ("DEL %d\n", fd);
+#endif
+ if (epoll_ctl (epollfd, EPOLL_CTL_DEL, fd,
NULL)) {
+ int err = errno;
+ g_message ("epoll_ctl(DEL): %d %s",
err, g_strerror (err));
+ }
+ }
+ }
+ LeaveCriticalSection (&data->io_lock);
+ }
+}
+#endif
+
#ifdef PLATFORM_WIN32
static void
connect_hack (gpointer x)
@@ -467,8 +587,8 @@
}
}
}
+#endif
-#endif
static void
socket_io_init (SocketIOData *data)
{
@@ -491,6 +611,21 @@
return;
}
+#ifdef HAVE_EPOLL
+ data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
+ if (FALSE == data->epoll_disabled) {
+ data->epollfd = epoll_create (256);
+ data->epoll_disabled = (data->epollfd == -1);
+ if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
+ g_message ("epoll_create() failed. Using plain
poll().");
+ } else {
+ data->epollfd = -1;
+ }
+#else
+ data->epollfd = -1;
+ data->epoll_disabled = TRUE;
+#endif
+
#ifndef PLATFORM_WIN32
if (pipe (data->pipe) != 0) {
int err = errno;
@@ -522,10 +657,19 @@
#endif
data->sock_to_state = g_hash_table_new (g_direct_hash, g_direct_equal);
- data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
+
+ if (FALSE == data->epoll_disabled)
+ data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
InitializeCriticalSection (&io_queue_lock);
- mono_thread_create (mono_get_root_domain (), socket_io_main, data);
+ if (data->epoll_disabled) {
+ mono_thread_create (mono_get_root_domain (),
socket_io_poll_main, data);
+ }
+#ifdef HAVE_EPOLL
+ else {
+ mono_thread_create (mono_get_root_domain (),
socket_io_epoll_main, data);
+ }
+#endif
InterlockedCompareExchange (&data->inited, 1, 0);
LeaveCriticalSection (&data->io_lock);
}
@@ -563,14 +707,65 @@
#endif
}
+#ifdef HAVE_EPOLL
+static gboolean
+socket_io_add_epoll (MonoSocketAsyncResult *state)
+{
+ GSList *list;
+ SocketIOData *data = &socket_io_data;
+ struct epoll_event event;
+ int epoll_op, ievt;
+ int fd;
+
+ memset (&event, 0, sizeof (struct epoll_event));
+ fd = GPOINTER_TO_INT (state->handle);
+ EnterCriticalSection (&data->io_lock);
+ list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
+ if (list == NULL) {
+ list = g_slist_alloc ();
+ list->data = state;
+ epoll_op = EPOLL_CTL_ADD;
+ } else {
+ list = g_slist_append (list, state);
+ epoll_op = EPOLL_CTL_MOD;
+ }
+
+ ievt = get_events_from_list (list);
+ if ((ievt & MONO_POLLIN) != 0)
+ event.events |= EPOLLIN;
+ if ((ievt & MONO_POLLOUT) != 0)
+ event.events |= EPOLLOUT;
+
+ g_hash_table_replace (data->sock_to_state, state->handle, list);
+ event.data.fd = fd;
+#ifdef EPOLL_DEBUG
+ g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD",
fd, event.events);
+#endif
+ if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
+ int err = errno;
+ g_message ("epoll_ctl(ADD): %d %s\n", err, g_strerror (err));
+ epoll_op = EPOLL_CTL_MOD;
+ if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
+ g_message ("epoll_ctl(MOD): %d %s\n", err, g_strerror
(err));
+ }
+ }
+
+ LeaveCriticalSection (&data->io_lock);
+ return TRUE;
+}
+#endif
+
static void
socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
{
- int op = state->operation;
-
socket_io_init (&socket_io_data);
state->ares = ares;
- op = op;
+#ifdef HAVE_EPOLL
+ if (socket_io_data.epoll_disabled == FALSE) {
+ if (socket_io_add_epoll (state))
+ return;
+ }
+#endif
socket_io_add_poll (state);
}
_______________________________________________
Mono-patches maillist - [email protected]
http://lists.ximian.com/mailman/listinfo/mono-patches