Author: Armin Rigo <[email protected]>
Branch: stm-thread
Changeset: r54939:c1ade41faaec
Date: 2012-05-07 19:18 +0200
http://bitbucket.org/pypy/pypy/changeset/c1ade41faaec/

Log:    thread.atomic.

diff --git a/pypy/module/thread/__init__.py b/pypy/module/thread/__init__.py
--- a/pypy/module/thread/__init__.py
+++ b/pypy/module/thread/__init__.py
@@ -4,6 +4,7 @@
 
 class Module(MixedModule):
     appleveldefs = {
+        'atomic':                 'app_atomic.atomic',
     }
 
     interpleveldefs = {
@@ -20,6 +21,8 @@
         'LockType':               'os_lock.Lock',
         #'_local':                'os_local.Local',   # only if 'rweakref'
         'error':                  'space.fromcache(error.Cache).w_error',
+        '_atomic_enter':          'atomic.atomic_enter',
+        '_atomic_exit':           'atomic.atomic_exit',
     }
 
     def __init__(self, space, *args):
diff --git a/pypy/module/thread/app_atomic.py b/pypy/module/thread/app_atomic.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/thread/app_atomic.py
@@ -0,0 +1,7 @@
+import thread
+
+class Atomic(object):
+    __enter__ = thread._atomic_enter
+    __exit__  = thread._atomic_exit
+
+atomic = Atomic()
diff --git a/pypy/module/thread/atomic.py b/pypy/module/thread/atomic.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/thread/atomic.py
@@ -0,0 +1,21 @@
+from pypy.interpreter.error import OperationError
+from pypy.rlib.rstm import increment_atomic, decrement_atomic, is_atomic
+
+def get_w_error(space):
+    from pypy.module.thread import error
+    return space.fromcache(error.Cache).w_error
+
+def atomic_enter(space):
+    if not space.config.translation.stm:
+        raise OperationError(get_w_error(space),
+            space.wrap("atomic.__enter__(): STM not available"))
+    increment_atomic()
+
+def atomic_exit(space, w_ignored1=None, w_ignored2=None, w_ignored3=None):
+    if not space.config.translation.stm:
+        raise OperationError(get_w_error(space),
+            space.wrap("atomic.__exit__(): STM not available"))
+    if not is_atomic():
+        raise OperationError(get_w_error(space),
+            space.wrap("atomic.__exit__(): more exits than enters"))
+    decrement_atomic()
diff --git a/pypy/module/thread/gil.py b/pypy/module/thread/gil.py
--- a/pypy/module/thread/gil.py
+++ b/pypy/module/thread/gil.py
@@ -2,8 +2,6 @@
 Global Interpreter Lock.
 """
 
-... do not import me for now ...
-
 # This module adds a global lock to an object space.
 # If multiple threads try to execute simultaneously in this space,
 # all but one will be blocked.  The other threads get a chance to run
diff --git a/pypy/module/thread/test/support.py 
b/pypy/module/thread/test/support.py
--- a/pypy/module/thread/test/support.py
+++ b/pypy/module/thread/test/support.py
@@ -2,12 +2,12 @@
 import time, gc, thread, os
 from pypy.conftest import gettestobjspace, option
 from pypy.interpreter.gateway import ObjSpace, W_Root, interp2app_temp
-from pypy.module.thread import gil
 
 
 NORMAL_TIMEOUT = 300.0   # 5 minutes
 
 def waitfor(space, w_condition, delay=1):
+    from pypy.module.thread import gil
     adaptivedelay = 0.04
     limit = time.time() + delay * NORMAL_TIMEOUT
     while time.time() <= limit:
diff --git a/pypy/rlib/rstm.py b/pypy/rlib/rstm.py
--- a/pypy/rlib/rstm.py
+++ b/pypy/rlib/rstm.py
@@ -13,43 +13,53 @@
 def is_inevitable():
     return we_are_translated() and stmgcintf.StmOperations.is_inevitable()
 
+def increment_atomic():
+    stmgcintf.StmOperations.add_atomic(+1)
+
+def decrement_atomic():
+    stmgcintf.StmOperations.add_atomic(-1)
+
+def is_atomic():
+    return stmgcintf.StmOperations.get_atomic()
+
 def before_external_call():
-    e = get_errno()
-    llop.stm_stop_transaction(lltype.Void)
-    stmgcintf.StmOperations.commit_transaction()
-    set_errno(e)
+    if not is_atomic():
+        e = get_errno()
+        llop.stm_stop_transaction(lltype.Void)
+        stmgcintf.StmOperations.commit_transaction()
+        set_errno(e)
 before_external_call._dont_reach_me_in_del_ = True
 before_external_call._transaction_break_ = True
 
 def after_external_call():
-    e = get_errno()
-    stmgcintf.StmOperations.begin_inevitable_transaction()
-    llop.stm_start_transaction(lltype.Void)
-    set_errno(e)
+    if not is_atomic():
+        e = get_errno()
+        stmgcintf.StmOperations.begin_inevitable_transaction()
+        llop.stm_start_transaction(lltype.Void)
+        set_errno(e)
 after_external_call._dont_reach_me_in_del_ = True
 after_external_call._transaction_break_ = True
 
 def enter_callback_call():
-    e = get_errno()
     token = stmgcintf.StmOperations.descriptor_init()
-    stmgcintf.StmOperations.begin_inevitable_transaction()
     if token != 1:
-        llop.stm_start_transaction(lltype.Void)
-    #else: the StmGCTLS is not built yet.  leave it to gc_thread_start()
-    set_errno(e)
+        after_external_call()
+    else:
+        ll_assert(not is_atomic(), "new thread: is_atomic() != 0")
+        stmgcintf.StmOperations.begin_inevitable_transaction()
+        # the StmGCTLS is not built yet.  leave it to gc_thread_start()
     return token
 enter_callback_call._dont_reach_me_in_del_ = True
 enter_callback_call._transaction_break_ = True
 
 def leave_callback_call(token):
-    e = get_errno()
     if token != 1:
-        llop.stm_stop_transaction(lltype.Void)
-    #else: the StmGCTLS is already destroyed, done by gc_thread_die()
-    stmgcintf.StmOperations.commit_transaction()
-    if token == 1:
+        before_external_call()
+    else:
+        # the StmGCTLS is already destroyed, done by gc_thread_die()
+        # (we don't care if is_atomic() or not, we'll commit now)
+        stmgcintf.StmOperations.commit_transaction()
         stmgcintf.StmOperations.descriptor_done()
-    set_errno(e)
 leave_callback_call._dont_reach_me_in_del_ = True
 leave_callback_call._transaction_break_ = True
 
@@ -58,7 +68,8 @@
 @specialize.memo()
 def _get_stm_callback(func, argcls):
     def _stm_callback(llarg, retry_counter):
-        llop.stm_start_transaction(lltype.Void)
+        if not is_atomic():
+            llop.stm_start_transaction(lltype.Void)
         llarg = rffi.cast(rclass.OBJECTPTR, llarg)
         arg = cast_base_ptr_to_instance(argcls, llarg)
         try:
@@ -66,7 +77,8 @@
         except:
             fatalerror("no exception allowed in stm_callback")
             assert 0
-        llop.stm_stop_transaction(lltype.Void)
+        if not is_atomic():
+            llop.stm_stop_transaction(lltype.Void)
         return res
     return _stm_callback
 
diff --git a/pypy/translator/stm/src_stm/core.c 
b/pypy/translator/stm/src_stm/core.c
--- a/pypy/translator/stm/src_stm/core.c
+++ b/pypy/translator/stm/src_stm/core.c
@@ -736,12 +736,25 @@
   return is_inevitable(d);
 }
 
+static __thread long stm_atomic = 0;
+
+void stm_add_atomic(long delta)
+{
+    stm_atomic += delta;
+}
+
+long stm_get_atomic(void)
+{
+    return stm_atomic;
+}
+
 void stm_perform_transaction(long(*callback)(void*, long), void *arg,
                              void *save_and_restore)
 {
   jmp_buf _jmpbuf;
   long volatile v_counter = 0;
   void *volatile saved_value;
+  long volatile v_atomic = stm_atomic;
   assert(thread_descriptor->active == 0);
   saved_value = *(void**)save_and_restore;
   /***/
@@ -752,12 +765,15 @@
   long counter, result;
   *(void**)save_and_restore = saved_value;
   counter = v_counter;
+  stm_atomic = v_atomic;
   do
     {
       v_counter = counter + 1;
-      begin_transaction(&_jmpbuf);
+      if (!stm_atomic)
+        begin_transaction(&_jmpbuf);
       result = callback(arg, counter);
-      stm_commit_transaction();
+      if (!stm_atomic)
+        stm_commit_transaction();
       counter = 0;
     }
   while (result == 1);  /* also stops if we got an RPython exception */
diff --git a/pypy/translator/stm/src_stm/et.h b/pypy/translator/stm/src_stm/et.h
--- a/pypy/translator/stm/src_stm/et.h
+++ b/pypy/translator/stm/src_stm/et.h
@@ -29,6 +29,8 @@
 
 long stm_in_transaction(void);
 long stm_is_inevitable(void);
+void stm_add_atomic(long);
+long stm_get_atomic(void);
 
 void stm_perform_transaction(long(*)(void*, long), void*, void*);
 
diff --git a/pypy/translator/stm/stmgcintf.py b/pypy/translator/stm/stmgcintf.py
--- a/pypy/translator/stm/stmgcintf.py
+++ b/pypy/translator/stm/stmgcintf.py
@@ -48,6 +48,8 @@
     # C part of the implementation of the pypy.rlib.rstm module
     in_transaction = smexternal('stm_in_transaction', [], lltype.Signed)
     is_inevitable = smexternal('stm_is_inevitable', [], lltype.Signed)
+    add_atomic = smexternal('stm_add_atomic', [lltype.Signed], lltype.Void)
+    get_atomic = smexternal('stm_get_atomic', [], lltype.Signed)
     descriptor_init = smexternal('stm_descriptor_init', [], lltype.Signed)
     descriptor_done = smexternal('stm_descriptor_done', [], lltype.Void)
     begin_inevitable_transaction = smexternal(
diff --git a/pypy/translator/stm/test/targetdemo2.py 
b/pypy/translator/stm/test/targetdemo2.py
--- a/pypy/translator/stm/test/targetdemo2.py
+++ b/pypy/translator/stm/test/targetdemo2.py
@@ -70,6 +70,17 @@
             self.finished_lock.release()
 
     def run_really(self, retry_counter):
+        if self.value == glob.LENGTH // 2:
+            print "atomic!"
+            assert not rstm.is_atomic()
+            rstm.increment_atomic()
+            assert rstm.is_atomic()
+        if self.value == glob.LENGTH * 2 // 3:
+            print "--------------- done atomic"
+            assert rstm.is_atomic()
+            rstm.decrement_atomic()
+            assert not rstm.is_atomic()
+        #
         add_at_end_of_chained_list(glob.anchor, self.value, self.index)
         self.value += 1
         return int(self.value < glob.LENGTH)
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to