Author: Armin Rigo <[email protected]>
Branch: stm
Changeset: r51692:e5329461f8ca
Date: 2012-01-23 17:15 +0100
http://bitbucket.org/pypy/pypy/changeset/e5329461f8ca/
Log: Make the ExecutionContext again a thread-local. See comments.
diff --git a/pypy/module/transaction/__init__.py
b/pypy/module/transaction/__init__.py
--- a/pypy/module/transaction/__init__.py
+++ b/pypy/module/transaction/__init__.py
@@ -15,12 +15,13 @@
'TransactionError': 'app_transaction.TransactionError',
}
+ def __init__(self, space, *args):
+ "NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
+ from pypy.module.transaction import interp_transaction
+ MixedModule.__init__(self, space, *args)
+ interp_transaction.state.initialize(space)
+ space.threadlocals = interp_transaction.state
+
def startup(self, space):
from pypy.module.transaction import interp_transaction
- interp_transaction.state.startup(space)
-
- def translating_for_checkmodule(self, space):
- from pypy.module.transaction import interp_transaction
- interp_transaction.state._freeze_()
- interp_transaction.state.space = space
- interp_transaction.state._freeze_ = lambda: None # hack!
+ interp_transaction.state.startup(space, space.wrap(self))
diff --git a/pypy/module/transaction/interp_transaction.py
b/pypy/module/transaction/interp_transaction.py
--- a/pypy/module/transaction/interp_transaction.py
+++ b/pypy/module/transaction/interp_transaction.py
@@ -12,27 +12,73 @@
class State(object):
- def _freeze_(self):
- self.__dict__.clear()
+ def initialize(self, space):
+ self.space = space
self.running = False
self.num_threads = NUM_THREADS_DEFAULT
- self.pending = Fifo()
- self.pending_lists = {0: self.pending}
+ #
+ self.w_error = None
self.ll_lock = threadintf.null_ll_lock
self.ll_no_tasks_pending_lock = threadintf.null_ll_lock
self.ll_unfinished_lock = threadintf.null_ll_lock
- self.w_error = None
+ self.threadobjs = {} # empty during translation
+ self.pending = Fifo()
- def startup(self, space):
- self.space = space
- w_module = space.getbuiltinmodule('transaction')
- self.w_error = space.getattr(w_module, space.wrap('TransactionError'))
- #
+ def _freeze_(self):
+ self.threadobjs.clear()
+ return False
+
+ def startup(self, space, w_module):
+ assert space is self.space
+ if w_module is not None: # for tests
+ self.w_error = space.getattr(w_module,
+ space.wrap('TransactionError'))
self.ll_lock = threadintf.allocate_lock()
self.ll_no_tasks_pending_lock = threadintf.allocate_lock()
self.ll_unfinished_lock = threadintf.allocate_lock()
self.lock_unfinished()
- self.pending_lists = {MAIN_THREAD_ID: self.pending}
+ self.startup_run()
+
+ def startup_run(self):
+ # this is called at the start of run() too, in order to make
+ # test_checkmodule happy
+ main_ec = self.space.getexecutioncontext() # create it if needed
+ main_ec._transaction_pending = self.pending
+
+ def add_thread(self, id, ec):
+ # register a new transaction thread
+ assert id not in self.threadobjs
+ ec._transaction_pending = Fifo()
+ self.threadobjs[id] = ec
+
+ def del_thread(self, id):
+ # un-register a transaction thread
+ del self.threadobjs[id]
+
+ # ---------- interface for ThreadLocals ----------
+ # This works really like a thread-local, which may have slightly
+ # strange consequences in multiple transactions, because you don't
+ # know on which thread a transaction will run. The point of this is
+ # to let every thread get its own ExecutionContext; otherwise, they
+ # conflict with each other e.g. when setting the 'topframeref'
+ # attribute.
+
+ def getvalue(self):
+ id = rstm.thread_id()
+ return self.threadobjs.get(id, None)
+
+ def setvalue(self, value):
+ id = rstm.thread_id()
+ assert id == MAIN_THREAD_ID # should not be used from a transaction
+ self.threadobjs[id] = value
+
+ def getmainthreadvalue(self):
+ return self.threadobjs[0]
+
+ def getallvalues(self):
+ return self.threadobjs
+
+ # ----------
def set_num_threads(self, num):
if self.running:
@@ -70,7 +116,6 @@
state = State()
-state._freeze_()
@unwrap_spec(num=int)
@@ -88,8 +133,8 @@
self.args = args
def register(self):
- id = rstm.thread_id()
- state.pending_lists[id].append(self)
+ ec = state.getvalue()
+ ec._transaction_pending.append(self)
def run(self):
rstm.perform_transaction(Pending._run_in_transaction, Pending, self)
@@ -124,10 +169,9 @@
def _run_thread():
state.lock()
rstm.descriptor_init()
- my_pending_list = Fifo()
my_thread_id = rstm.thread_id()
- assert my_thread_id not in state.pending_lists
- state.pending_lists[my_thread_id] = my_pending_list
+ my_ec = state.space.createexecutioncontext()
+ state.add_thread(my_thread_id, my_ec)
#
while True:
if state.pending.is_empty():
@@ -152,10 +196,10 @@
state.unlock()
pending.run()
state.lock()
- _add_list(my_pending_list)
+ _add_list(my_ec._transaction_pending)
#
+ state.del_thread(my_thread_id)
rstm.descriptor_done()
- del state.pending_lists[my_thread_id]
if state.num_waiting_threads == 0: # only the last thread to leave
state.unlock_unfinished()
state.unlock()
@@ -166,6 +210,7 @@
raise OperationError(
state.w_error,
space.wrap("recursive invocation of transaction.run()"))
+ state.startup_run()
assert not state.is_locked_no_tasks_pending()
if state.pending.is_empty():
return
@@ -181,7 +226,7 @@
#
assert state.num_waiting_threads == 0
assert state.pending.is_empty()
- assert state.pending_lists.keys() == [MAIN_THREAD_ID]
+ assert state.threadobjs.keys() == [MAIN_THREAD_ID]
assert not state.is_locked_no_tasks_pending()
state.running = False
#
diff --git a/pypy/module/transaction/test/test_interp_transaction.py
b/pypy/module/transaction/test/test_interp_transaction.py
--- a/pypy/module/transaction/test/test_interp_transaction.py
+++ b/pypy/module/transaction/test/test_interp_transaction.py
@@ -11,13 +11,29 @@
return 'some stuff from the transaction module'
def wrap(self, x):
return 'wrapped stuff'
+ def getexecutioncontext(self):
+ ec = interp_transaction.state.getvalue()
+ if ec is None:
+ ec = self.createexecutioncontext()
+ interp_transaction.state.setvalue(ec)
+ return ec
+ def createexecutioncontext(self):
+ return FakeEC()
def call_args(self, w_callback, args):
w_callback(*args)
+class FakeEC:
+ pass
+
+def make_fake_space():
+ space = FakeSpace()
+ interp_transaction.state.initialize(space)
+ interp_transaction.state.startup(space, None)
+ return space
+
def test_linear_list():
- space = FakeSpace()
- interp_transaction.state.startup(space)
+ space = make_fake_space()
seen = []
#
def do(n):
@@ -32,8 +48,7 @@
def test_tree_of_transactions():
- space = FakeSpace()
- interp_transaction.state.startup(space)
+ space = make_fake_space()
seen = []
#
def do(level):
@@ -51,8 +66,7 @@
def test_transactional_simple():
- space = FakeSpace()
- interp_transaction.state.startup(space)
+ space = make_fake_space()
lst = []
def f(n):
lst.append(n+0)
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit