Author: Armin Rigo <[email protected]>
Branch: stm-thread
Changeset: r54939:c1ade41faaec
Date: 2012-05-07 19:18 +0200
http://bitbucket.org/pypy/pypy/changeset/c1ade41faaec/
Log: thread.atomic.
diff --git a/pypy/module/thread/__init__.py b/pypy/module/thread/__init__.py
--- a/pypy/module/thread/__init__.py
+++ b/pypy/module/thread/__init__.py
@@ -4,6 +4,7 @@
class Module(MixedModule):
appleveldefs = {
+ 'atomic': 'app_atomic.atomic',
}
interpleveldefs = {
@@ -20,6 +21,8 @@
'LockType': 'os_lock.Lock',
#'_local': 'os_local.Local', # only if 'rweakref'
'error': 'space.fromcache(error.Cache).w_error',
+ '_atomic_enter': 'atomic.atomic_enter',
+ '_atomic_exit': 'atomic.atomic_exit',
}
def __init__(self, space, *args):
diff --git a/pypy/module/thread/app_atomic.py b/pypy/module/thread/app_atomic.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/thread/app_atomic.py
@@ -0,0 +1,7 @@
+import thread
+
+class Atomic(object):
+ __enter__ = thread._atomic_enter
+ __exit__ = thread._atomic_exit
+
+atomic = Atomic()
diff --git a/pypy/module/thread/atomic.py b/pypy/module/thread/atomic.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/thread/atomic.py
@@ -0,0 +1,21 @@
+from pypy.interpreter.error import OperationError
+from pypy.rlib.rstm import increment_atomic, decrement_atomic, is_atomic
+
+def get_w_error(space):
+ from pypy.module.thread import error
+ return space.fromcache(error.Cache).w_error
+
+def atomic_enter(space):
+ if not space.config.translation.stm:
+ raise OperationError(get_w_error(space),
+ space.wrap("atomic.__enter__(): STM not available"))
+ increment_atomic()
+
+def atomic_exit(space, w_ignored1=None, w_ignored2=None, w_ignored3=None):
+ if not space.config.translation.stm:
+ raise OperationError(get_w_error(space),
+ space.wrap("atomic.__exit__(): STM not available"))
+ if not is_atomic():
+ raise OperationError(get_w_error(space),
+ space.wrap("atomic.__exit__(): more exits than enters"))
+ decrement_atomic()
diff --git a/pypy/module/thread/gil.py b/pypy/module/thread/gil.py
--- a/pypy/module/thread/gil.py
+++ b/pypy/module/thread/gil.py
@@ -2,8 +2,6 @@
Global Interpreter Lock.
"""
-... do not import me for now ...
-
# This module adds a global lock to an object space.
# If multiple threads try to execute simultaneously in this space,
# all but one will be blocked. The other threads get a chance to run
diff --git a/pypy/module/thread/test/support.py
b/pypy/module/thread/test/support.py
--- a/pypy/module/thread/test/support.py
+++ b/pypy/module/thread/test/support.py
@@ -2,12 +2,12 @@
import time, gc, thread, os
from pypy.conftest import gettestobjspace, option
from pypy.interpreter.gateway import ObjSpace, W_Root, interp2app_temp
-from pypy.module.thread import gil
NORMAL_TIMEOUT = 300.0 # 5 minutes
def waitfor(space, w_condition, delay=1):
+ from pypy.module.thread import gil
adaptivedelay = 0.04
limit = time.time() + delay * NORMAL_TIMEOUT
while time.time() <= limit:
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -13,43 +13,53 @@
def is_inevitable():
return we_are_translated() and stmgcintf.StmOperations.is_inevitable()
+def increment_atomic():
+ stmgcintf.StmOperations.add_atomic(+1)
+
+def decrement_atomic():
+ stmgcintf.StmOperations.add_atomic(-1)
+
+def is_atomic():
+ return stmgcintf.StmOperations.get_atomic()
+
def before_external_call():
- e = get_errno()
- llop.stm_stop_transaction(lltype.Void)
- stmgcintf.StmOperations.commit_transaction()
- set_errno(e)
+ if not is_atomic():
+ e = get_errno()
+ llop.stm_stop_transaction(lltype.Void)
+ stmgcintf.StmOperations.commit_transaction()
+ set_errno(e)
before_external_call._dont_reach_me_in_del_ = True
before_external_call._transaction_break_ = True
def after_external_call():
- e = get_errno()
- stmgcintf.StmOperations.begin_inevitable_transaction()
- llop.stm_start_transaction(lltype.Void)
- set_errno(e)
+ if not is_atomic():
+ e = get_errno()
+ stmgcintf.StmOperations.begin_inevitable_transaction()
+ llop.stm_start_transaction(lltype.Void)
+ set_errno(e)
after_external_call._dont_reach_me_in_del_ = True
after_external_call._transaction_break_ = True
def enter_callback_call():
- e = get_errno()
token = stmgcintf.StmOperations.descriptor_init()
- stmgcintf.StmOperations.begin_inevitable_transaction()
if token != 1:
- llop.stm_start_transaction(lltype.Void)
- #else: the StmGCTLS is not built yet. leave it to gc_thread_start()
- set_errno(e)
+ after_external_call()
+ else:
+ ll_assert(not is_atomic(), "new thread: is_atomic() != 0")
+ stmgcintf.StmOperations.begin_inevitable_transaction()
+ # the StmGCTLS is not built yet. leave it to gc_thread_start()
return token
enter_callback_call._dont_reach_me_in_del_ = True
enter_callback_call._transaction_break_ = True
def leave_callback_call(token):
- e = get_errno()
if token != 1:
- llop.stm_stop_transaction(lltype.Void)
- #else: the StmGCTLS is already destroyed, done by gc_thread_die()
- stmgcintf.StmOperations.commit_transaction()
- if token == 1:
+ before_external_call()
+ else:
+ # the StmGCTLS is already destroyed, done by gc_thread_die()
+ # (we don't care if is_atomic() or not, we'll commit now)
+ stmgcintf.StmOperations.commit_transaction()
stmgcintf.StmOperations.descriptor_done()
- set_errno(e)
leave_callback_call._dont_reach_me_in_del_ = True
leave_callback_call._transaction_break_ = True
@@ -58,7 +68,8 @@
@specialize.memo()
def _get_stm_callback(func, argcls):
def _stm_callback(llarg, retry_counter):
- llop.stm_start_transaction(lltype.Void)
+ if not is_atomic():
+ llop.stm_start_transaction(lltype.Void)
llarg = rffi.cast(rclass.OBJECTPTR, llarg)
arg = cast_base_ptr_to_instance(argcls, llarg)
try:
@@ -66,7 +77,8 @@
except:
fatalerror("no exception allowed in stm_callback")
assert 0
- llop.stm_stop_transaction(lltype.Void)
+ if not is_atomic():
+ llop.stm_stop_transaction(lltype.Void)
return res
return _stm_callback
diff --git a/pypy/translator/stm/src_stm/core.c
b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -736,12 +736,25 @@
return is_inevitable(d);
}
+static __thread long stm_atomic = 0;
+
+void stm_add_atomic(long delta)
+{
+ stm_atomic += delta;
+}
+
+long stm_get_atomic(void)
+{
+ return stm_atomic;
+}
+
void stm_perform_transaction(long(*callback)(void*, long), void *arg,
void *save_and_restore)
{
jmp_buf _jmpbuf;
long volatile v_counter = 0;
void *volatile saved_value;
+ long volatile v_atomic = stm_atomic;
assert(thread_descriptor->active == 0);
saved_value = *(void**)save_and_restore;
/***/
@@ -752,12 +765,15 @@
long counter, result;
*(void**)save_and_restore = saved_value;
counter = v_counter;
+ stm_atomic = v_atomic;
do
{
v_counter = counter + 1;
- begin_transaction(&_jmpbuf);
+ if (!stm_atomic)
+ begin_transaction(&_jmpbuf);
result = callback(arg, counter);
- stm_commit_transaction();
+ if (!stm_atomic)
+ stm_commit_transaction();
counter = 0;
}
while (result == 1); /* also stops if we got an RPython exception */
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -29,6 +29,8 @@
long stm_in_transaction(void);
long stm_is_inevitable(void);
+void stm_add_atomic(long);
+long stm_get_atomic(void);
void stm_perform_transaction(long(*)(void*, long), void*, void*);
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -48,6 +48,8 @@
# C part of the implementation of the pypy.rlib.rstm module
in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
is_inevitable = smexternal('stm_is_inevitable', [], lltype.Signed)
+ add_atomic = smexternal('stm_add_atomic', [lltype.Signed], lltype.Void)
+ get_atomic = smexternal('stm_get_atomic', [], lltype.Signed)
descriptor_init = smexternal('stm_descriptor_init', [], lltype.Signed)
descriptor_done = smexternal('stm_descriptor_done', [], lltype.Void)
begin_inevitable_transaction = smexternal(
diff --git a/pypy/translator/stm/test/targetdemo2.py
b/pypy/translator/stm/test/targetdemo2.py
--- a/pypy/translator/stm/test/targetdemo2.py
+++ b/pypy/translator/stm/test/targetdemo2.py
@@ -70,6 +70,17 @@
self.finished_lock.release()
def run_really(self, retry_counter):
+ if self.value == glob.LENGTH // 2:
+ print "atomic!"
+ assert not rstm.is_atomic()
+ rstm.increment_atomic()
+ assert rstm.is_atomic()
+ if self.value == glob.LENGTH * 2 // 3:
+ print "--------------- done atomic"
+ assert rstm.is_atomic()
+ rstm.decrement_atomic()
+ assert not rstm.is_atomic()
+ #
add_at_end_of_chained_list(glob.anchor, self.value, self.index)
self.value += 1
return int(self.value < glob.LENGTH)
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit