Author: Armin Rigo <[email protected]>
Branch: stm
Changeset: r51692:e5329461f8ca
Date: 2012-01-23 17:15 +0100
http://bitbucket.org/pypy/pypy/changeset/e5329461f8ca/

Log:    Make the ExecutionContext again a thread-local. See comments.

diff --git a/pypy/module/transaction/__init__.py 
b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -15,12 +15,13 @@
         'TransactionError': 'app_transaction.TransactionError',
     }
 
+    def __init__(self, space, *args):
+        "NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
+        from pypy.module.transaction import interp_transaction
+        MixedModule.__init__(self, space, *args)
+        interp_transaction.state.initialize(space)
+        space.threadlocals = interp_transaction.state
+
     def startup(self, space):
         from pypy.module.transaction import interp_transaction
-        interp_transaction.state.startup(space)
-
-    def translating_for_checkmodule(self, space):
-        from pypy.module.transaction import interp_transaction
-        interp_transaction.state._freeze_()
-        interp_transaction.state.space = space
-        interp_transaction.state._freeze_ = lambda: None   # hack!
+        interp_transaction.state.startup(space, space.wrap(self))
diff --git a/pypy/module/transaction/interp_transaction.py 
b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -12,27 +12,73 @@
 
 class State(object):
 
-    def _freeze_(self):
-        self.__dict__.clear()
+    def initialize(self, space):
+        self.space = space
         self.running = False
         self.num_threads = NUM_THREADS_DEFAULT
-        self.pending = Fifo()
-        self.pending_lists = {0: self.pending}
+        #
+        self.w_error = None
         self.ll_lock = threadintf.null_ll_lock
         self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
         self.ll_unfinished_lock = threadintf.null_ll_lock
-        self.w_error = None
+        self.threadobjs = {}      # empty during translation
+        self.pending = Fifo()
 
-    def startup(self, space):
-        self.space = space
-        w_module = space.getbuiltinmodule('transaction')
-        self.w_error = space.getattr(w_module, space.wrap('TransactionError'))
-        #
+    def _freeze_(self):
+        self.threadobjs.clear()
+        return False
+
+    def startup(self, space, w_module):
+        assert space is self.space
+        if w_module is not None:     # for tests
+            self.w_error = space.getattr(w_module,
+                                         space.wrap('TransactionError'))
         self.ll_lock = threadintf.allocate_lock()
         self.ll_no_tasks_pending_lock = threadintf.allocate_lock()
         self.ll_unfinished_lock = threadintf.allocate_lock()
         self.lock_unfinished()
-        self.pending_lists = {MAIN_THREAD_ID: self.pending}
+        self.startup_run()
+
+    def startup_run(self):
+        # this is called at the start of run() too, in order to make
+        # test_checkmodule happy
+        main_ec = self.space.getexecutioncontext()    # create it if needed
+        main_ec._transaction_pending = self.pending
+
+    def add_thread(self, id, ec):
+        # register a new transaction thread
+        assert id not in self.threadobjs
+        ec._transaction_pending = Fifo()
+        self.threadobjs[id] = ec
+
+    def del_thread(self, id):
+        # un-register a transaction thread
+        del self.threadobjs[id]
+
+    # ---------- interface for ThreadLocals ----------
+    # This works really like a thread-local, which may have slightly
+    # strange consequences in multiple transactions, because you don't
+    # know on which thread a transaction will run.  The point of this is
+    # to let every thread get its own ExecutionContext; otherwise, they
+    # conflict with each other e.g. when setting the 'topframeref'
+    # attribute.
+
+    def getvalue(self):
+        id = rstm.thread_id()
+        return self.threadobjs.get(id, None)
+
+    def setvalue(self, value):
+        id = rstm.thread_id()
+        assert id == MAIN_THREAD_ID   # should not be used from a transaction
+        self.threadobjs[id] = value
+
+    def getmainthreadvalue(self):
+        return self.threadobjs[0]
+
+    def getallvalues(self):
+        return self.threadobjs
+
+    # ----------
 
     def set_num_threads(self, num):
         if self.running:
@@ -70,7 +116,6 @@
 
 
 state = State()
-state._freeze_()
 
 
 @unwrap_spec(num=int)
@@ -88,8 +133,8 @@
         self.args = args
 
     def register(self):
-        id = rstm.thread_id()
-        state.pending_lists[id].append(self)
+        ec = state.getvalue()
+        ec._transaction_pending.append(self)
 
     def run(self):
         rstm.perform_transaction(Pending._run_in_transaction, Pending, self)
@@ -124,10 +169,9 @@
 def _run_thread():
     state.lock()
     rstm.descriptor_init()
-    my_pending_list = Fifo()
     my_thread_id = rstm.thread_id()
-    assert my_thread_id not in state.pending_lists
-    state.pending_lists[my_thread_id] = my_pending_list
+    my_ec = state.space.createexecutioncontext()
+    state.add_thread(my_thread_id, my_ec)
     #
     while True:
         if state.pending.is_empty():
@@ -152,10 +196,10 @@
             state.unlock()
             pending.run()
             state.lock()
-            _add_list(my_pending_list)
+            _add_list(my_ec._transaction_pending)
     #
+    state.del_thread(my_thread_id)
     rstm.descriptor_done()
-    del state.pending_lists[my_thread_id]
     if state.num_waiting_threads == 0:    # only the last thread to leave
         state.unlock_unfinished()
     state.unlock()
@@ -166,6 +210,7 @@
         raise OperationError(
             state.w_error,
             space.wrap("recursive invocation of transaction.run()"))
+    state.startup_run()
     assert not state.is_locked_no_tasks_pending()
     if state.pending.is_empty():
         return
@@ -181,7 +226,7 @@
     #
     assert state.num_waiting_threads == 0
     assert state.pending.is_empty()
-    assert state.pending_lists.keys() == [MAIN_THREAD_ID]
+    assert state.threadobjs.keys() == [MAIN_THREAD_ID]
     assert not state.is_locked_no_tasks_pending()
     state.running = False
     #
diff --git a/pypy/module/transaction/test/test_interp_transaction.py 
b/pypy/module/transaction/test/test_interp_transaction.py
--- a/pypy/module/transaction/test/test_interp_transaction.py
+++ b/pypy/module/transaction/test/test_interp_transaction.py
@@ -11,13 +11,29 @@
         return 'some stuff from the transaction module'
     def wrap(self, x):
         return 'wrapped stuff'
+    def getexecutioncontext(self):
+        ec = interp_transaction.state.getvalue()
+        if ec is None:
+            ec = self.createexecutioncontext()
+            interp_transaction.state.setvalue(ec)
+        return ec
+    def createexecutioncontext(self):
+        return FakeEC()
     def call_args(self, w_callback, args):
         w_callback(*args)
 
+class FakeEC:
+    pass
+
+def make_fake_space():
+    space = FakeSpace()
+    interp_transaction.state.initialize(space)
+    interp_transaction.state.startup(space, None)
+    return space
+
 
 def test_linear_list():
-    space = FakeSpace()
-    interp_transaction.state.startup(space)
+    space = make_fake_space()
     seen = []
     #
     def do(n):
@@ -32,8 +48,7 @@
 
 
 def test_tree_of_transactions():
-    space = FakeSpace()
-    interp_transaction.state.startup(space)
+    space = make_fake_space()
     seen = []
     #
     def do(level):
@@ -51,8 +66,7 @@
 
 
 def test_transactional_simple():
-    space = FakeSpace()
-    interp_transaction.state.startup(space)
+    space = make_fake_space()
     lst = []
     def f(n):
         lst.append(n+0)
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to