Author: Armin Rigo <[email protected]>
Branch: stm-gc
Changeset: r52371:dd0c59caf8e2
Date: 2012-02-11 12:31 +0100
http://bitbucket.org/pypy/pypy/changeset/dd0c59caf8e2/
Log: Attempt to rewrite things so that there is no GC allocation in the
non-main threads outside transactions.
diff --git a/pypy/module/transaction/interp_epoll.py
b/pypy/module/transaction/interp_epoll.py
--- a/pypy/module/transaction/interp_epoll.py
+++ b/pypy/module/transaction/interp_epoll.py
@@ -7,13 +7,23 @@
from pypy.rpython.lltypesystem import lltype, rffi
from pypy.interpreter.gateway import unwrap_spec
from pypy.interpreter.error import OperationError
+from pypy.module.select import interp_epoll
from pypy.module.select.interp_epoll import W_Epoll, FD_SETSIZE
from pypy.module.select.interp_epoll import epoll_event
-from pypy.module.select.interp_epoll import epoll_wait
from pypy.module.transaction import interp_transaction
from pypy.rlib import rposix
+# a _nowrapper version, to be sure that it does not allocate anything
+_epoll_wait = rffi.llexternal(
+ "epoll_wait",
+ [rffi.INT, lltype.Ptr(rffi.CArray(epoll_event)), rffi.INT, rffi.INT],
+ rffi.INT,
+ compilation_info=eci,
+ _nowrapper=True
+)
+
+
class EPollPending(interp_transaction.AbstractPending):
def __init__(self, space, epoller, w_callback):
self.space = space
@@ -21,26 +31,48 @@
self.w_callback = w_callback
def run(self):
- # this code is run non-transactionally
+ # This code is run non-transactionally. Careful, no GC available.
state = interp_transaction.state
if state.has_exception():
return
maxevents = FD_SETSIZE - 1 # for now
- timeout = 500 # for now: half a second
- with lltype.scoped_alloc(rffi.CArray(epoll_event), maxevents) as evs:
- nfds = epoll_wait(self.epoller.epfd, evs, maxevents, int(timeout))
- if nfds < 0:
- errno = rposix.get_errno()
- if errno == EINTR:
- nfds = 0 # ignore, just wait for more later
- else:
- state.got_exception_errno = errno
- state.must_reraise_exception(_reraise_from_errno)
- return
- for i in range(nfds):
- event = evs[i]
- fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
- PendingCallback(self.w_callback, fd, event.c_events).register()
+ evs = lltype.malloc(rffi.CArray(epoll_event), maxevents, flavor='raw')
+ try:
+ self.wait_and_process_events(evs, maxevents)
+ finally:
+ lltype.free(evs, flavor='raw')
+
+ def wait_and_process_events(self, evs, maxevents):
+ fd = rffi.cast(rffi.INT, self.epoller.epfd)
+ maxevents = rffi.cast(rffi.INT, maxevents)
+ timeout = rffi.cast(rffi.INT, 500) # for now: half a second
+ nfds = _epoll_wait(fd, evs, maxevents, timeout)
+ nfds = rffi.cast(lltype.Signed, nfds)
+ #
+ if nfds < 0:
+ errno = rposix.get_errno()
+ if errno == EINTR:
+ nfds = 0 # ignore, just wait for more later
+ else:
+ state.got_exception_errno = errno
+ state.must_reraise_exception(_reraise_from_errno)
+ return
+ # We have to allocate new PendingCallback objects, but we can't
+ # allocate anything here because we are not running transactionally.
+ # Workaround for now: run a new tiny transaction just to create
+ # and register these PendingCallback's.
+ self.evs = evs
+ self.nfds = nfds
+ rstm.perform_transaction(EPollPending._add_real_transactions,
+ EPollPending, self)
+
+ @staticmethod
+ def _add_real_transactions(self, retry_counter):
+ evs = self.evs
+ for i in range(self.nfds):
+ event = evs[i]
+ fd = rffi.cast(lltype.Signed, event.c_data.c_fd)
+ PendingCallback(self.w_callback, fd, event.c_events).register()
# re-register myself to epoll_wait() for more
self.register()
diff --git a/pypy/module/transaction/interp_transaction.py
b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -3,12 +3,11 @@
from pypy.module.transaction import threadintf
from pypy.module.transaction.fifo import Fifo
from pypy.rlib import rstm
+from pypy.rlib.debug import ll_assert
NUM_THREADS_DEFAULT = 4 # by default
-MAIN_THREAD_ID = 0
-
class State(object):
@@ -22,6 +21,7 @@
self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
self.ll_unfinished_lock = threadintf.null_ll_lock
self.threadobjs = {} # empty during translation
+ self.main_thread_id = 0
self.pending = Fifo()
def _freeze_(self):
@@ -69,11 +69,15 @@
def setvalue(self, value):
id = rstm.thread_id()
- assert id == MAIN_THREAD_ID # should not be used from a transaction
+ if self.main_thread_id == 0:
+ self.main_thread_id = id
+ else:
+ # this should not be used from a transaction
+ assert id == self.main_thread_id
self.threadobjs[id] = value
def getmainthreadvalue(self):
- return self.threadobjs[0]
+ return self.threadobjs[self.main_thread_id]
def getallvalues(self):
return self.threadobjs
@@ -90,28 +94,38 @@
self.num_threads = num
def lock(self):
- # XXX think about the interaction between locks and the GC
+ """Acquire the main lock. This plays a role similar to the GIL
+ in that it must be acquired in order to have the _run_thread()
+ code execute; but it is released around every execution of a
+ transaction."""
threadintf.acquire(self.ll_lock, True)
def unlock(self):
+ """Release the main lock."""
threadintf.release(self.ll_lock)
def lock_no_tasks_pending(self):
+ """This lock is acquired when state.pending.is_empty()."""
threadintf.acquire(self.ll_no_tasks_pending_lock, True)
def unlock_no_tasks_pending(self):
+ """Release the ll_no_tasks_pending_lock."""
threadintf.release(self.ll_no_tasks_pending_lock)
def is_locked_no_tasks_pending(self):
+ """Test ll_no_tasks_pending_lock for debugging."""
just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False)
if just_locked:
threadintf.release(self.ll_no_tasks_pending_lock)
return not just_locked
def lock_unfinished(self):
+ """This lock is normally acquired. It is released when all threads
+ are done."""
threadintf.acquire(self.ll_unfinished_lock, True)
def unlock_unfinished(self):
+ """Release ll_unfinished_lock."""
threadintf.release(self.ll_unfinished_lock)
def init_exceptions(self):
@@ -142,6 +156,11 @@
_alloc_nonmovable_ = True
def register(self):
+ """Register this AbstractPending instance in the pending list
+ belonging to the current thread. If called from the main
+ thread, it is the global list. If called from a transaction,
+ it is a thread-local list that will be merged with the global
+ list when the transaction is done."""
ec = state.getvalue()
ec._transaction_pending.append(self)
@@ -192,16 +211,27 @@
state.unlock_no_tasks_pending()
-def _run_thread():
- state.lock()
- rstm.descriptor_init()
+def _setup_thread(_, retry_counter):
+ """Setup a thread. Run as a transaction because it allocates."""
my_thread_id = rstm.thread_id()
my_ec = state.space.createexecutioncontext()
state.add_thread(my_thread_id, my_ec)
+
+
+def _run_thread():
+ """The main function running one of the threads."""
+ # Note that we cannot allocate any object here outside a transaction,
+ # so we need to be very careful.
+ state.lock()
+ rstm.descriptor_init()
+ #
+ rstm.perform_transaction(_setup_thread, AbstractPending, None)
+ my_transactions_pending = state.getvalue()._transaction_pending
#
while True:
if state.pending.is_empty():
- assert state.is_locked_no_tasks_pending()
+ ll_assert(state.is_locked_no_tasks_pending(),
+ "inconsistently unlocked no_tasks_pending")
state.num_waiting_threads += 1
if state.num_waiting_threads == state.num_threads:
state.finished = True
@@ -222,9 +252,9 @@
state.unlock()
pending.run()
state.lock()
- _add_list(my_ec._transaction_pending)
+ _add_list(my_transactions_pending)
#
- state.del_thread(my_thread_id)
+ state.del_thread(rstm.thread_id())
rstm.descriptor_done()
if state.num_waiting_threads == 0: # only the last thread to leave
state.unlock_unfinished()
@@ -240,20 +270,31 @@
assert not state.is_locked_no_tasks_pending()
if state.pending.is_empty():
return
+ #
+ # 'num_waiting_threads' is the number of threads that are currently
+ # waiting for more work to do. When it becomes equal to
+ # 'num_threads' then we are done: we set 'finished' to True and this
+ # causes all threads to leave. Only accessed during a
+ # 'state.lock'-protected region.
state.num_waiting_threads = 0
state.finished = False
+ #
state.running = True
state.init_exceptions()
#
+ # --- start the threads --- don't use the GC here any more! ---
for i in range(state.num_threads):
threadintf.start_new_thread(_run_thread, ())
#
state.lock_unfinished() # wait for all threads to finish
+ # --- done, we can use the GC again ---
#
assert state.num_waiting_threads == 0
assert state.pending.is_empty()
assert state.threadobjs.keys() == [MAIN_THREAD_ID]
assert not state.is_locked_no_tasks_pending()
+ assert state.threadobjects == []
+ state.threadobjects = None
state.running = False
#
# now re-raise the exception that we got in a transaction
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit