Author: Armin Rigo <ar...@tunes.org> Branch: stm Changeset: r51487:b52f0e138410 Date: 2012-01-19 16:14 +0100 http://bitbucket.org/pypy/pypy/changeset/b52f0e138410/
Log: (bivab, arigo) Wrote the app-level interface for 'transaction', with careful(?) messy locking. diff --git a/pypy/module/transaction/__init__.py b/pypy/module/transaction/__init__.py new file mode 100644 --- /dev/null +++ b/pypy/module/transaction/__init__.py @@ -0,0 +1,20 @@ + +from pypy.interpreter.mixedmodule import MixedModule + +class Module(MixedModule): + """Transaction module. XXX document me + """ + + interpleveldefs = { + 'set_num_threads': 'interp_transaction.set_num_threads', + 'add': 'interp_transaction.add', + 'run': 'interp_transaction.run', + 'TransactionError': 'interp_transaction.state.w_error', + } + + appleveldefs = { + } + + def startup(self, space): + from pypy.module.transaction import interp_transaction + interp_transaction.state.startup(space) diff --git a/pypy/module/transaction/interp_transaction.py b/pypy/module/transaction/interp_transaction.py new file mode 100644 --- /dev/null +++ b/pypy/module/transaction/interp_transaction.py @@ -0,0 +1,138 @@ +from pypy.interpreter.error import OperationError +from pypy.interpreter.gateway import unwrap_spec +from pypy.module.transaction import threadintf + + +NUM_THREADS_DEFAULT = 4 # by default + + +class State(object): + + def _freeze_(self): + self.__dict__.clear() + self.running = False + self.num_threads = NUM_THREADS_DEFAULT + + def startup(self, space): + self.space = space + self.pending = [] + self.ll_lock = threadintf.allocate_lock() + self.ll_no_tasks_pending_lock = threadintf.allocate_lock() + self.ll_unfinished_lock = threadintf.allocate_lock() + self.w_error = space.new_exception_class( + "transaction.TransactionError") + self.lock_no_tasks_pending() + self.lock_unfinished() + + def set_num_threads(self, num): + if self.running: + space = self.space + raise OperationError(space.w_ValueError, + space.wrap("cannot change the number of " + "threads when transaction.run() " + "is active")) + self.num_threads = num_threads + + def lock(self): + # XXX think about the interaction between locks and the GC + threadintf.acquire(self.ll_lock, True) + + def unlock(self): + threadintf.release(self.ll_lock) + + def lock_no_tasks_pending(self): + threadintf.acquire(self.ll_no_tasks_pending_lock, True) + + def unlock_no_tasks_pending(self): + threadintf.release(self.ll_no_tasks_pending_lock) + + def assert_locked_no_tasks_pending(self): + just_locked = threadintf.acquire(self.ll_no_tasks_pending_lock, False) + assert not just_locked + + def lock_unfinished(self): + threadintf.acquire(self.ll_unfinished_lock, True) + + def unlock_unfinished(self): + threadintf.release(self.ll_unfinished_lock) + + +state = State() +state._freeze_() + + +@unwrap_spec(num=int) +def set_num_threads(space, num): + if num < 1: + num = 1 + state.set_num_threads(num) + + +class Pending: + def __init__(self, w_callback, args): + self.w_callback = w_callback + self.args = args + + def run(self): + space = state.space + space.call_args(self.w_callback, self.args) + # xxx exceptions? + + +def add(space, w_callback, __args__): + state.lock() + was_empty = len(state.pending) == 0 + state.pending.append(Pending(w_callback, __args__)) + if was_empty: + state.unlock_no_tasks_pending() + state.unlock() + + +def _run_thread(): + state.lock() + # + while True: + if len(state.pending) == 0: + state.assert_locked_no_tasks_pending() + state.num_waiting_threads += 1 + if state.num_waiting_threads == state.num_threads: + state.finished = True + state.unlock_unfinished() + state.unlock_no_tasks_pending() + state.unlock() + # + state.lock_no_tasks_pending() + state.unlock_no_tasks_pending() + # + state.lock() + if state.finished: + break + state.num_waiting_threads -= 1 + else: + pending = state.pending.pop(0) + if len(state.pending) == 0: + state.lock_no_tasks_pending() + state.unlock() + pending.run() + state.lock() + # + state.unlock() + + +def run(space): + if state.running: + raise OperationError( + state.w_error, + space.wrap("recursive invocation of transaction.run()")) + state.num_waiting_threads = 0 + state.finished = False + state.running = True + # + for i in range(state.num_threads): + threadintf.start_new_thread(_run_thread, ()) + # + state.lock_unfinished() + assert state.num_waiting_threads == state.num_threads + assert len(state.pending) == 0 + state.lock_no_tasks_pending() + state.running = False diff --git a/pypy/module/transaction/test/__init__.py b/pypy/module/transaction/test/__init__.py new file mode 100644 diff --git a/pypy/module/transaction/test/test_interp_transaction.py b/pypy/module/transaction/test/test_interp_transaction.py new file mode 100644 --- /dev/null +++ b/pypy/module/transaction/test/test_interp_transaction.py @@ -0,0 +1,24 @@ +from pypy.module.transaction import interp_transaction + + +class FakeSpace: + def new_exception_class(self, name): + return "some error class" + def call_args(self, w_callback, args): + w_callback(*args) + + +def test_linear_list(): + space = FakeSpace() + interp_transaction.state.startup(space) + seen = [] + # + def do(n): + seen.append(n) + if n < 200: + interp_transaction.add(space, do, (n+1,)) + # + interp_transaction.add(space, do, (0,)) + assert seen == [] + interp_transaction.run(space) + assert seen == range(201) diff --git a/pypy/module/transaction/test/test_transaction.py b/pypy/module/transaction/test/test_transaction.py new file mode 100644 --- /dev/null +++ b/pypy/module/transaction/test/test_transaction.py @@ -0,0 +1,16 @@ +import py +from pypy.conftest import gettestobjspace + + +class AppTestTransaction: + def setup_class(cls): + cls.space = gettestobjspace(usemodules=['transaction']) + + def test_simple(self): + import transaction + lst = [] + transaction.add(lst.append, 5) + transaction.add(lst.append, 6) + transaction.add(lst.append, 7) + transaction.run() + assert sorted(lst) == [5, 6, 7] diff --git a/pypy/module/transaction/threadintf.py b/pypy/module/transaction/threadintf.py new file mode 100644 --- /dev/null +++ b/pypy/module/transaction/threadintf.py @@ -0,0 +1,19 @@ +import thread + + +def allocate_lock(): + "NOT_RPYTHON" + return thread.allocate_lock() + +def acquire(lock, wait): + "NOT_RPYTHON" + lock.acquire(wait) + +def release(lock): + "NOT_RPYTHON" + lock.release() + +def start_new_thread(callback, args): + "NOT_RPYTHON" + thread.start_new_thread(callback, args) + _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit