Author: Armin Rigo <[email protected]>
Branch: stm-gc
Changeset: r52371:dd0c59caf8e2
Date: 2012-02-11 12:31 +0100
http://bitbucket.org/pypy/pypy/changeset/dd0c59caf8e2/

Log:    Attempt to rewrite things so that there is no GC allocation in the
        non-main threads outside transactions.

diff --git a/pypy/module/transaction/interp_epoll.py 
b/pypy/module/transaction/interp_epoll.py
--- a/pypy/module/transaction/interp_epoll.py
+++ b/pypy/module/transaction/interp_epoll.py
@@ -7,13 +7,23 @@
 from pypy.rpython.lltypesystem import lltype, rffi
 from pypy.interpreter.gateway import unwrap_spec
 from pypy.interpreter.error import OperationError
+from pypy.module.select import interp_epoll
 from pypy.module.select.interp_epoll import W_Epoll, FD_SETSIZE
 from pypy.module.select.interp_epoll import epoll_event
-from pypy.module.select.interp_epoll import epoll_wait
 from pypy.module.transaction import interp_transaction
 from pypy.rlib import rposix
 
 
+# a _nowrapper version, to be sure that it does not allocate anything
+_epoll_wait = rffi.llexternal(
+    "epoll_wait",
+    [rffi.INT, lltype.Ptr(rffi.CArray(epoll_event)), rffi.INT, rffi.INT],
+    rffi.INT,
+    compilation_info=eci,
+    _nowrapper=True
+)
+
+
 class EPollPending(interp_transaction.AbstractPending):
     def __init__(self, space, epoller, w_callback):
         self.space = space
@@ -21,26 +31,48 @@
         self.w_callback = w_callback
 
     def run(self):
-        # this code is run non-transactionally
+        # This code is run non-transactionally.  Careful, no GC available.
         state = interp_transaction.state
         if state.has_exception():
             return
         maxevents = FD_SETSIZE - 1    # for now
-        timeout = 500                 # for now: half a second
-        with lltype.scoped_alloc(rffi.CArray(epoll_event), maxevents) as evs:
-            nfds = epoll_wait(self.epoller.epfd, evs, maxevents, int(timeout))
-            if nfds < 0:
-                errno = rposix.get_errno()
-                if errno == EINTR:
-                    nfds = 0    # ignore, just wait for more later
-                else:
-                    state.got_exception_errno = errno
-                    state.must_reraise_exception(_reraise_from_errno)
-                    return
-            for i in range(nfds):
-                event = evs[i]
-                fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
-                PendingCallback(self.w_callback, fd, event.c_events).register()
+        evs = lltype.malloc(rffi.CArray(epoll_event), maxevents, flavor='raw')
+        try:
+            self.wait_and_process_events(evs, maxevents)
+        finally:
+            lltype.free(evs, flavor='raw')
+
+    def wait_and_process_events(self, evs, maxevents):
+        fd = rffi.cast(rffi.INT, self.epoller.epfd)
+        maxevents = rffi.cast(rffi.INT, maxevents)
+        timeout = rffi.cast(rffi.INT, 500)     # for now: half a second
+        nfds = _epoll_wait(fd, evs, maxevents, timeout)
+        nfds = rffi.cast(lltype.Signed, nfds)
+        #
+        if nfds < 0:
+            errno = rposix.get_errno()
+            if errno == EINTR:
+                nfds = 0    # ignore, just wait for more later
+            else:
+                state.got_exception_errno = errno
+                state.must_reraise_exception(_reraise_from_errno)
+                return
+        # We have to allocate new PendingCallback objects, but we can't
+        # allocate anything here because we are not running transactionally.
+        # Workaround for now: run a new tiny transaction just to create
+        # and register these PendingCallback's.
+        self.evs = evs
+        self.nfds = nfds
+        rstm.perform_transaction(EPollPending._add_real_transactions,
+                                 EPollPending, self)
+
+    @staticmethod
+    def _add_real_transactions(self, retry_counter):
+        evs = self.evs
+        for i in range(self.nfds):
+            event = evs[i]
+            fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
+            PendingCallback(self.w_callback, fd, event.c_events).register()
         # re-register myself to epoll_wait() for more
         self.register()
 
diff --git a/pypy/module/transaction/interp_transaction.py 
b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -3,12 +3,11 @@
 from pypy.module.transaction import threadintf
 from pypy.module.transaction.fifo import Fifo
 from pypy.rlib import rstm
+from pypy.rlib.debug import ll_assert
 
 
 NUM_THREADS_DEFAULT = 4     # by default
 
-MAIN_THREAD_ID = 0
-
 
 class State(object):
 
@@ -22,6 +21,7 @@
         self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
         self.ll_unfinished_lock = threadintf.null_ll_lock
         self.threadobjs = {}      # empty during translation
+        self.main_thread_id = 0
         self.pending = Fifo()
 
     def _freeze_(self):
@@ -69,11 +69,15 @@
 
     def setvalue(self, value):
         id = rstm.thread_id()
-        assert id == MAIN_THREAD_ID   # should not be used from a transaction
+        if self.main_thread_id == 0:
+            self.main_thread_id = id
+        else:
+            # this should not be used from a transaction
+            assert id == self.main_thread_id
         self.threadobjs[id] = value
 
     def getmainthreadvalue(self):
-        return self.threadobjs[0]
+        return self.threadobjs[self.main_thread_id]
 
     def getallvalues(self):
         return self.threadobjs
@@ -90,28 +94,38 @@
         self.num_threads = num
 
     def lock(self):
-        # XXX think about the interaction between locks and the GC
+        """Acquire the main lock.  This plays a role similar to the GIL
+        in that it must be acquired in order to have the _run_thread()
+        code execute; but it is released around every execution of a
+        transaction."""
         threadintf.acquire(self.ll_lock, True)
 
     def unlock(self):
+        """Release the main lock."""
         threadintf.release(self.ll_lock)
 
     def lock_no_tasks_pending(self):
+        """This lock is acquired when state.pending.is_empty()."""
         threadintf.acquire(self.ll_no_tasks_pending_lock, True)
 
     def unlock_no_tasks_pending(self):
+        """Release the ll_no_tasks_pending_lock."""
         threadintf.release(self.ll_no_tasks_pending_lock)
 
     def is_locked_no_tasks_pending(self):
+        """Test ll_no_tasks_pending_lock for debugging."""
         just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False)
         if just_locked:
             threadintf.release(self.ll_no_tasks_pending_lock)
         return not just_locked
 
     def lock_unfinished(self):
+        """This lock is normally acquired.  It is released when all threads
+        are done."""
         threadintf.acquire(self.ll_unfinished_lock, True)
 
     def unlock_unfinished(self):
+        """Release ll_unfinished_lock."""
         threadintf.release(self.ll_unfinished_lock)
 
     def init_exceptions(self):
@@ -142,6 +156,11 @@
     _alloc_nonmovable_ = True
 
     def register(self):
+        """Register this AbstractPending instance in the pending list
+        belonging to the current thread.  If called from the main
+        thread, it is the global list.  If called from a transaction,
+        it is a thread-local list that will be merged with the global
+        list when the transaction is done."""
         ec = state.getvalue()
         ec._transaction_pending.append(self)
 
@@ -192,16 +211,27 @@
         state.unlock_no_tasks_pending()
 
 
-def _run_thread():
-    state.lock()
-    rstm.descriptor_init()
+def _setup_thread(_, retry_counter):
+    """Setup a thread.  Run as a transaction because it allocates."""
     my_thread_id = rstm.thread_id()
     my_ec = state.space.createexecutioncontext()
     state.add_thread(my_thread_id, my_ec)
+
+
+def _run_thread():
+    """The main function running one of the threads."""
+    # Note that we cannot allocate any object here outside a transaction,
+    # so we need to be very careful.
+    state.lock()
+    rstm.descriptor_init()
+    #
+    rstm.perform_transaction(_setup_thread, AbstractPending, None)
+    my_transactions_pending = state.getvalue()._transaction_pending
     #
     while True:
         if state.pending.is_empty():
-            assert state.is_locked_no_tasks_pending()
+            ll_assert(state.is_locked_no_tasks_pending(),
+                      "inconsistently unlocked no_tasks_pending")
             state.num_waiting_threads += 1
             if state.num_waiting_threads == state.num_threads:
                 state.finished = True
@@ -222,9 +252,9 @@
             state.unlock()
             pending.run()
             state.lock()
-            _add_list(my_ec._transaction_pending)
+            _add_list(my_transactions_pending)
     #
-    state.del_thread(my_thread_id)
+    state.del_thread(rstm.thread_id())
     rstm.descriptor_done()
     if state.num_waiting_threads == 0:    # only the last thread to leave
         state.unlock_unfinished()
@@ -240,20 +270,31 @@
     assert not state.is_locked_no_tasks_pending()
     if state.pending.is_empty():
         return
+    #
+    # 'num_waiting_threads' is the number of threads that are currently
+    # waiting for more work to do.  When it becomes equal to
+    # 'num_threads' then we are done: we set 'finished' to True and this
+    # causes all threads to leave.  Only accessed during a
+    # 'state.lock'-protected region.
     state.num_waiting_threads = 0
     state.finished = False
+    #
     state.running = True
     state.init_exceptions()
     #
+    # --- start the threads --- don't use the GC here any more! ---
     for i in range(state.num_threads):
         threadintf.start_new_thread(_run_thread, ())
     #
     state.lock_unfinished()  # wait for all threads to finish
+    # --- done, we can use the GC again ---
     #
     assert state.num_waiting_threads == 0
     assert state.pending.is_empty()
     assert state.threadobjs.keys() == [MAIN_THREAD_ID]
     assert not state.is_locked_no_tasks_pending()
+    assert state.threadobjects == []
+    state.threadobjects = None
     state.running = False
     #
     # now re-raise the exception that we got in a transaction
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to