Author: gonzalo
Date: 2005-04-12 17:42:15 -0400 (Tue, 12 Apr 2005)
New Revision: 42880

Modified:
   trunk/mono/mono/metadata/ChangeLog
   trunk/mono/mono/metadata/threadpool.c
Log:
2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>

        * threadpool.c: added epoll() based implementation for asynchronous IO
        that is used instead of the default poll() when available.
        It can be disabled by setting MONO_DISABLE_AIO.



Modified: trunk/mono/mono/metadata/ChangeLog
===================================================================
--- trunk/mono/mono/metadata/ChangeLog  2005-04-12 21:03:02 UTC (rev 42879)
+++ trunk/mono/mono/metadata/ChangeLog  2005-04-12 21:42:15 UTC (rev 42880)
@@ -1,5 +1,11 @@
 2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>
 
+       * threadpool.c: added epoll() based implementation for asynchronous IO
+       that is used instead of the default poll() when available.
+       It can be disabled by setting MONO_DISABLE_AIO.
+
+2005-04-12 Gonzalo Paniagua Javier <[EMAIL PROTECTED]>
+
        * threadpool.c: windows needs 'closesocket' and instead of returning
        0 when the stream is closed while in select, it returns -1. Fixes bug
        #74573.

Modified: trunk/mono/mono/metadata/threadpool.c
===================================================================
--- trunk/mono/mono/metadata/threadpool.c       2005-04-12 21:03:02 UTC (rev 
42879)
+++ trunk/mono/mono/metadata/threadpool.c       2005-04-12 21:42:15 UTC (rev 
42880)
@@ -39,11 +39,15 @@
 #include <string.h>
 
 #include <mono/utils/mono-poll.h>
+#ifdef HAVE_EPOLL
+#include <sys/epoll.h>
+#endif
 
 #include "mono/io-layer/socket-wrappers.h"
 
 #include "threadpool.h"
 
+#undef EPOLL_DEBUG
 /* maximum number of worker threads */
 static int mono_max_worker_threads = THREADS_PER_CPU;
 static int mono_min_worker_threads = 0;
@@ -75,6 +79,10 @@
 
        HANDLE new_sem; /* access to newpfd and write side of the pipe */
        mono_pollfd *newpfd;
+       gboolean epoll_disabled;
+#ifdef HAVE_EPOLL
+       int epollfd;
+#endif
 } SocketIOData;
 
 static SocketIOData socket_io_data;
@@ -133,7 +141,8 @@
 #endif
        data->pipe [0] = -1;
        data->pipe [1] = -1;
-       CloseHandle (data->new_sem);
+       if (data->new_sem)
+               CloseHandle (data->new_sem);
        data->new_sem = NULL;
        g_hash_table_destroy (data->sock_to_state);
        data->sock_to_state = NULL;
@@ -141,6 +150,10 @@
        async_io_queue = NULL;
        g_free (data->newpfd);
        data->newpfd = NULL;
+#ifdef HAVE_EPOLL
+       if (FALSE == data->epoll_disabled)
+               close (data->epollfd);
+#endif
        LeaveCriticalSection (&data->io_lock);
 }
 
@@ -157,7 +170,7 @@
        case AIO_OP_CONNECT:
                return MONO_POLLOUT;
        default: /* Should never happen */
-               g_print ("socket_io_add: unknown value in switch!!!\n");
+               g_print ("get_event_from_state: unknown value in switch!!!\n");
                return 0;
        }
 }
@@ -275,12 +288,16 @@
                state = (MonoSocketAsyncResult *) list->data;
                if (get_event_from_state (state) == event)
                        break;
+               
                list = list->next;
        }
 
        if (list != NULL) {
                oldlist = g_slist_remove_link (oldlist, list);
                g_slist_free_1 (list);
+#ifdef EPOLL_DEBUG
+               g_print ("Dispatching event %d on socket %d\n", event, 
state->handle);
+#endif
                InterlockedIncrement (&pending_io_items);
                start_io_thread_or_queue (state->ares);
        }
@@ -313,7 +330,7 @@
 }
 
 static void
-socket_io_main (gpointer p)
+socket_io_poll_main (gpointer p)
 {
 #define INITIAL_POLLFD_SIZE    1024
 #define POLL_ERRORS (MONO_POLLERR | MONO_POLLHUP | MONO_POLLNVAL)
@@ -324,7 +341,6 @@
        gint i;
        MonoThread *thread;
 
- 
        thread = mono_thread_current ();
        thread->threadpool_thread = TRUE;
        thread->state |= ThreadState_Background;
@@ -452,6 +468,110 @@
        }
 }
 
+#ifdef HAVE_EPOLL
+#define EPOLL_ERRORS (EPOLLERR | EPOLLHUP)
+static void
+socket_io_epoll_main (gpointer p)
+{
+       SocketIOData *data;
+       int epollfd;
+       MonoThread *thread;
+       struct epoll_event *events, *evt;
+       const int nevents = 512;
+       int ready = 0, i;
+
+       data = p;
+       epollfd = data->epollfd;
+       thread = mono_thread_current ();
+       thread->threadpool_thread = TRUE;
+       thread->state |= ThreadState_Background;
+       events = g_new0 (struct epoll_event, nevents);
+
+       while (1) {
+               do {
+                       if (ready == -1) {
+                               if ((thread->state & ThreadState_StopRequested) 
!= 0) {
+                                       g_free (events);
+                                       close (epollfd);
+                                       mono_thread_interruption_checkpoint ();
+                                       g_assert_not_reached ();
+                               }
+                       }
+#ifdef EPOLL_DEBUG
+                       g_print ("epoll_wait init\n");
+#endif
+                       ready = epoll_wait (epollfd, events, nevents, -1);
+#ifdef EPOLL_DEBUG
+                       {
+                       int err = errno;
+                       g_print ("epoll_wait end with %d ready sockets.\n", 
ready);
+                       errno = err;
+                       }
+#endif
+               } while (ready == -1 && errno == EINTR);
+
+               if (ready == -1) {
+                       int err = errno;
+                       g_warning ("epoll_wait: %d %s\n", err, g_strerror 
(err));
+                       g_free (events);
+                       close (epollfd);
+                       return;
+               }
+
+               EnterCriticalSection (&data->io_lock);
+               if (data->inited == 0) {
+#ifdef EPOLL_DEBUG
+                       g_print ("data->inited == 0\n");
+#endif
+                       g_free (events);
+                       close (epollfd);
+                       return; /* cleanup called */
+               }
+
+               for (i = 0; i < ready; i++) {
+                       int fd;
+                       GSList *list;
+
+                       evt = &events [i];
+                       fd = evt->data.fd;
+                       list = g_hash_table_lookup (data->sock_to_state, 
GINT_TO_POINTER (fd));
+#ifdef EPOLL_DEBUG
+                       g_print ("Event %d on %d list length: %d\n", 
evt->events, fd, g_slist_length (list));
+#endif
+                       if (list != NULL && (evt->events & (EPOLLIN | 
EPOLL_ERRORS)) != 0) {
+                               list = process_io_event (list, MONO_POLLIN);
+                       }
+
+                       if (list != NULL && (evt->events & (EPOLLOUT | 
EPOLL_ERRORS)) != 0) {
+                               list = process_io_event (list, MONO_POLLOUT);
+                       }
+
+                       if (list != NULL) {
+                               g_hash_table_replace (data->sock_to_state, 
GINT_TO_POINTER (fd), list);
+                               evt->events = get_events_from_list (list);
+#ifdef EPOLL_DEBUG
+                       g_print ("MOD %d to %d\n", fd, evt->events);
+#endif
+                               if (epoll_ctl (epollfd, EPOLL_CTL_MOD, fd, 
evt)) {
+                                       int err = errno;
+                                       g_message ("epoll_ctl(MOD): %d %s", 
err, g_strerror (err));
+                               }
+                       } else {
+                               g_hash_table_remove (data->sock_to_state, 
GINT_TO_POINTER (fd));
+#ifdef EPOLL_DEBUG
+                       g_print ("DEL %d\n", fd);
+#endif
+                               if (epoll_ctl (epollfd, EPOLL_CTL_DEL, fd, 
NULL)) {
+                                       int err = errno;
+                                       g_message ("epoll_ctl(DEL): %d %s", 
err, g_strerror (err));
+                               }
+                       }
+               }
+               LeaveCriticalSection (&data->io_lock);
+       }
+}
+#endif
+
 #ifdef PLATFORM_WIN32
 static void
 connect_hack (gpointer x)
@@ -467,8 +587,8 @@
                }
        }
 }
+#endif
 
-#endif
 static void
 socket_io_init (SocketIOData *data)
 {
@@ -491,6 +611,21 @@
                return;
        }
 
+#ifdef HAVE_EPOLL
+       data->epoll_disabled = (g_getenv ("MONO_DISABLE_AIO") != NULL);
+       if (FALSE == data->epoll_disabled) {
+               data->epollfd = epoll_create (256);
+               data->epoll_disabled = (data->epollfd == -1);
+               if (data->epoll_disabled && g_getenv ("MONO_DEBUG"))
+                       g_message ("epoll_create() failed. Using plain 
poll().");
+       } else {
+               data->epollfd = -1;
+       }
+#else
+       data->epollfd = -1;
+       data->epoll_disabled = TRUE;
+#endif
+
 #ifndef PLATFORM_WIN32
        if (pipe (data->pipe) != 0) {
                int err = errno;
@@ -522,10 +657,19 @@
 #endif
 
        data->sock_to_state = g_hash_table_new (g_direct_hash, g_direct_equal);
-       data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
+
+       if (FALSE == data->epoll_disabled)
+               data->new_sem = CreateSemaphore (NULL, 1, 1, NULL);
        io_job_added = CreateSemaphore (NULL, 0, 0x7fffffff, NULL);
        InitializeCriticalSection (&io_queue_lock);
-       mono_thread_create (mono_get_root_domain (), socket_io_main, data);
+       if (data->epoll_disabled) {
+               mono_thread_create (mono_get_root_domain (), 
socket_io_poll_main, data);
+       }
+#ifdef HAVE_EPOLL
+       else {
+               mono_thread_create (mono_get_root_domain (), 
socket_io_epoll_main, data);
+       }
+#endif
        InterlockedCompareExchange (&data->inited, 1, 0);
        LeaveCriticalSection (&data->io_lock);
 }
@@ -563,14 +707,65 @@
 #endif
 }
 
+#ifdef HAVE_EPOLL
+static gboolean
+socket_io_add_epoll (MonoSocketAsyncResult *state)
+{
+       GSList *list;
+       SocketIOData *data = &socket_io_data;
+       struct epoll_event event;
+       int epoll_op, ievt;
+       int fd;
+
+       memset (&event, 0, sizeof (struct epoll_event));
+       fd = GPOINTER_TO_INT (state->handle);
+       EnterCriticalSection (&data->io_lock);
+       list = g_hash_table_lookup (data->sock_to_state, GINT_TO_POINTER (fd));
+       if (list == NULL) {
+               list = g_slist_alloc ();
+               list->data = state;
+               epoll_op = EPOLL_CTL_ADD;
+       } else {
+               list = g_slist_append (list, state);
+               epoll_op = EPOLL_CTL_MOD;
+       }
+
+       ievt = get_events_from_list (list);
+       if ((ievt & MONO_POLLIN) != 0)
+               event.events |= EPOLLIN;
+       if ((ievt & MONO_POLLOUT) != 0)
+               event.events |= EPOLLOUT;
+
+       g_hash_table_replace (data->sock_to_state, state->handle, list);
+       event.data.fd = fd;
+#ifdef EPOLL_DEBUG
+       g_print ("%s %d with %d\n", epoll_op == EPOLL_CTL_ADD ? "ADD" : "MOD", 
fd, event.events);
+#endif
+       if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
+               int err = errno;
+               g_message ("epoll_ctl(ADD): %d %s\n", err, g_strerror (err));
+               epoll_op = EPOLL_CTL_MOD;
+               if (epoll_ctl (data->epollfd, epoll_op, fd, &event) == -1) {
+                       g_message ("epoll_ctl(MOD): %d %s\n", err, g_strerror 
(err));
+               }
+       }
+
+       LeaveCriticalSection (&data->io_lock);
+       return TRUE;
+}
+#endif
+
 static void
 socket_io_add (MonoAsyncResult *ares, MonoSocketAsyncResult *state)
 {
-       int op = state->operation;
-
        socket_io_init (&socket_io_data);
        state->ares = ares;
-       op = op;
+#ifdef HAVE_EPOLL
+       if (socket_io_data.epoll_disabled == FALSE) {
+               if (socket_io_add_epoll (state))
+                       return;
+       }
+#endif
        socket_io_add_poll (state);
 }
 

_______________________________________________
Mono-patches maillist  -  [email protected]
http://lists.ximian.com/mailman/listinfo/mono-patches

Reply via email to