Author: Remi Meier
Branch: c7
Changeset: r687:58ad71386b61
Date: 2014-01-29 10:36 +0100
http://bitbucket.org/pypy/stmgc/changeset/58ad71386b61/

Log:    implement inevitable transactions

diff --git a/c7/core.c b/c7/core.c
--- a/c7/core.c
+++ b/c7/core.c
@@ -22,7 +22,7 @@
 char *object_pages;
 static int num_threads_started;
 uint8_t write_locks[READMARKER_END - READMARKER_START];
-
+volatile uint8_t inevitable_lock;
 
 struct _thread_local1_s* _stm_dbg_get_tl(int thread)
 {
@@ -125,6 +125,7 @@
     uintptr_t lock_idx = (((uintptr_t)obj) >> 4) - READMARKER_START;
     uint8_t lock_num = _STM_TL->thread_num + 1;
     uint8_t prev_owner;
+ retry:
     do {
         prev_owner = __sync_val_compare_and_swap(&write_locks[lock_idx],
                                                0, lock_num);
@@ -132,7 +133,16 @@
         /* if there was no lock-holder or we already have the lock */
         if ((!prev_owner) || (prev_owner == lock_num))
             break;
-        
+
+        if (_STM_TL->active == 2) {
+            /* we must succeed! */
+            _stm_dbg_get_tl(prev_owner - 1)->need_abort = 1;
+            _stm_start_no_collect_safe_point();
+            /* XXX: not good */
+            usleep(1);
+            _stm_stop_no_collect_safe_point();
+            goto retry;
+        }
         /* XXXXXX */
         //_stm_start_semi_safe_point();
         //usleep(1);
@@ -161,6 +171,8 @@
 {
     _stm_reset_shared_lock();
     _stm_reset_pages();
+
+    inevitable_lock = 0;
     
     /* Check that some values are acceptable */
     assert(4096 <= ((uintptr_t)_STM_TL));
@@ -259,12 +271,12 @@
     
     _STM_TL->modified_objects = stm_list_create();
     _STM_TL->uncommitted_objects = stm_list_create();
-    assert(!_STM_TL->running_transaction);
+    assert(!_STM_TL->active);
 }
 
 bool _stm_is_in_transaction(void)
 {
-    return _STM_TL->running_transaction;
+    return _STM_TL->active;
 }
 
 void _stm_teardown_thread(void)
@@ -288,6 +300,7 @@
 
 void _stm_teardown(void)
 {
+    assert(inevitable_lock == 0);
     munmap(object_pages, TOTAL_MEMORY);
     _stm_reset_pages();
     memset(write_locks, 0, sizeof(write_locks));
@@ -330,9 +343,46 @@
 }
 
 
+void stm_become_inevitable(char* msg)
+{
+    if (_STM_TL->active == 2)
+        return;
+    assert(_STM_TL->active == 1);
+
+    uint8_t our_lock = _STM_TL->thread_num + 1;
+    do {
+        _stm_start_safe_point();
+
+        stm_start_exclusive_lock();
+        if (_STM_TL->need_abort) {
+            stm_stop_exclusive_lock();
+            stm_start_shared_lock();
+            stm_abort_transaction();
+        }
+
+        if (!inevitable_lock)
+            break;
+
+        stm_stop_exclusive_lock();
+        _stm_stop_safe_point();
+    } while (1);
+
+    inevitable_lock = our_lock;
+    _STM_TL->active = 2;
+    stm_stop_exclusive_lock();
+    
+    _stm_stop_safe_point();
+}
+
+void stm_start_inevitable_transaction()
+{
+    stm_start_transaction(NULL);
+    stm_become_inevitable("stm_start_inevitable_transaction");
+}
+
 void stm_start_transaction(jmpbufptr_t *jmpbufptr)
 {
-    assert(!_STM_TL->running_transaction);
+    assert(!_STM_TL->active);
 
     stm_start_shared_lock();
     
@@ -346,14 +396,14 @@
     nursery_on_start();
     
     _STM_TL->jmpbufptr = jmpbufptr;
-    _STM_TL->running_transaction = 1;
+    _STM_TL->active = 1;
     _STM_TL->need_abort = 0;
 }
 
 
 void stm_stop_transaction(void)
 {
-    assert(_STM_TL->running_transaction);
+    assert(_STM_TL->active);
 
     /* do the minor_collection here and not in nursery_on_commit,
        since here we can still run concurrently with other threads
@@ -361,8 +411,32 @@
     _stm_minor_collect();
 
     /* Some operations require us to have the EXCLUSIVE lock */
-    stm_stop_shared_lock();
-    stm_start_exclusive_lock();
+    if (_STM_TL->active == 1) {
+        while (1) {
+            _stm_start_safe_point();
+            usleep(1);          /* XXX: better algorithm that allows
+                                   for waiting on a mutex */
+            stm_start_exclusive_lock();
+            if (_STM_TL->need_abort) {
+                stm_stop_exclusive_lock();
+                stm_start_shared_lock();
+                stm_abort_transaction();
+            }
+            
+            if (!inevitable_lock)
+                break;
+            stm_stop_exclusive_lock();
+            _stm_stop_safe_point();
+        }
+        /* we have the exclusive lock */
+    } else {
+        /* inevitable! no other transaction could have committed
+           or aborted us */
+        stm_stop_shared_lock();
+        stm_start_exclusive_lock();
+        assert(!_STM_TL->need_abort);
+        inevitable_lock = 0;
+    }
 
     _STM_TL->jmpbufptr = NULL;          /* cannot abort any more */
 
@@ -374,7 +448,7 @@
     stm_list_clear(_STM_TL->modified_objects);
 
  
-    _STM_TL->running_transaction = 0;
+    _STM_TL->active = 0;
     stm_stop_exclusive_lock();
     fprintf(stderr, "%c", 'C'+_STM_TL->thread_num*32);
 }
@@ -421,13 +495,13 @@
 void stm_abort_transaction(void)
 {
     /* here we hold the shared lock as a reader or writer */
-    assert(_STM_TL->running_transaction);
+    assert(_STM_TL->active == 1);
     
     nursery_on_abort();
     
     assert(_STM_TL->jmpbufptr != NULL);
     assert(_STM_TL->jmpbufptr != (jmpbufptr_t *)-1);   /* for tests only */
-    _STM_TL->running_transaction = 0;
+    _STM_TL->active = 0;
     stm_stop_shared_lock();
     fprintf(stderr, "%c", 'A'+_STM_TL->thread_num*32);
 
diff --git a/c7/core.h b/c7/core.h
--- a/c7/core.h
+++ b/c7/core.h
@@ -44,8 +44,6 @@
 
 
 
-
-
 #define TLPREFIX __attribute__((address_space(256)))
 
 typedef TLPREFIX struct _thread_local1_s _thread_local1_t;
@@ -94,7 +92,7 @@
     uint8_t transaction_read_version;
     
     int thread_num;
-    bool running_transaction;
+    uint8_t active;                /* 1 normal, 2 inevitable, 0 no trans. */
     bool need_abort;
     char *thread_base;
     struct stm_list_s *modified_objects;
@@ -226,8 +224,8 @@
 void stm_abort_transaction(void);
 
 void _stm_minor_collect();
-#define stm_become_inevitable(msg)   /* XXX implement me! */
-#define stm_start_inevitable_transaction() stm_start_transaction(NULL)   /* 
XXX implement me! */
+void stm_become_inevitable(char* msg);
+void stm_start_inevitable_transaction();
 
 struct _thread_local1_s* _stm_dbg_get_tl(int thread); /* -1 is current thread 
*/
 
diff --git a/c7/nursery.c b/c7/nursery.c
--- a/c7/nursery.c
+++ b/c7/nursery.c
@@ -19,7 +19,7 @@
 
 void stm_major_collection(void)
 {
-    assert(_STM_TL->running_transaction);
+    assert(_STM_TL->active);
     abort();
 }
 
@@ -190,7 +190,7 @@
 {
     _stm_start_safe_point();
     _stm_stop_safe_point();
-    assert(_STM_TL->running_transaction);
+    assert(_STM_TL->active);
     assert(size % 8 == 0);
     assert(16 <= size && size < NB_NURSERY_PAGES * 4096);//XXX
 
diff --git a/c7/stmsync.h b/c7/stmsync.h
--- a/c7/stmsync.h
+++ b/c7/stmsync.h
@@ -6,6 +6,9 @@
 void stm_start_exclusive_lock(void);
 void _stm_start_safe_point(void);
 void _stm_stop_safe_point(void);
-void _stm_reset_shared_lock();
+void _stm_reset_shared_lock(void);
 
+/* XXX: major collections must not be possible here: */
+#define _stm_start_no_collect_safe_point(void) _stm_start_safe_point()
+#define _stm_stop_no_collect_safe_point(void) _stm_stop_safe_point()
 
diff --git a/c7/test/support.py b/c7/test/support.py
--- a/c7/test/support.py
+++ b/c7/test/support.py
@@ -108,6 +108,10 @@
 size_t _stm_data_size(struct object_s *data);
 void _stm_chunk_pages(struct object_s *data, uintptr_t *start, uintptr_t *num);
 
+void stm_become_inevitable(char* msg);
+void stm_start_inevitable_transaction();
+bool _checked_stm_become_inevitable();
+
 """)
 
 lib = ffi.verify('''
@@ -137,6 +141,19 @@
 }
 
 
+bool _checked_stm_become_inevitable() {
+    jmpbufptr_t here;
+    if (__builtin_setjmp(here) == 0) { // returned directly
+         assert(_STM_TL->jmpbufptr == (jmpbufptr_t*)-1);
+         _STM_TL->jmpbufptr = &here;
+         stm_become_inevitable("TEST");
+         _STM_TL->jmpbufptr = (jmpbufptr_t*)-1;
+         return 0;
+    }
+    _STM_TL->jmpbufptr = (jmpbufptr_t*)-1;
+    return 1;
+}
+
 bool _checked_stm_write(object_t *object) {
     jmpbufptr_t here;
     if (__builtin_setjmp(here) == 0) { // returned directly
@@ -349,6 +366,10 @@
     if lib._stm_check_stop_safe_point():
         raise Conflict()
 
+def stm_become_inevitable():
+    if lib._checked_stm_become_inevitable():
+        raise Conflict()
+
 def stm_minor_collect():
     lib._stm_minor_collect()
 
diff --git a/c7/test/test_basic.py b/c7/test/test_basic.py
--- a/c7/test/test_basic.py
+++ b/c7/test/test_basic.py
@@ -473,6 +473,25 @@
         stm_stop_transaction()
         
 
+    def test_inevitable_transaction(self):
+        py.test.skip("stm_write and stm_stop_transaction"
+                     " of an inevitable tr. is not testable"
+                     " since they wait for the other thread"
+                     " to synchronize and possibly abort")
+
+        old = stm_allocate_old(16)
+        stm_start_transaction()
+
+        self.switch(1)
+        stm_start_transaction()
+        stm_write(old)
+
+        self.switch(0)
+        stm_become_inevitable()
+        stm_write(old) # t1 needs to abort, not us
+        stm_stop_transaction()
+
+        py.test.raises(Conflict, self.switch, 1)
         
     # def test_resolve_write_write_no_conflict(self):
     #     stm_start_transaction()
diff --git a/duhton/demo/run_transactions.duh b/duhton/demo/run_transactions.duh
new file mode 100644
--- /dev/null
+++ b/duhton/demo/run_transactions.duh
@@ -0,0 +1,25 @@
+
+(setq c (container 0))
+
+(defun g (n)
+  (setq i n)
+  (while (< 0 i)
+    (set c (+ (get c) 1))
+    (setq i (- i 1))
+    )
+  )
+
+(defun f (thread n)
+  (g n)
+  )
+
+(transaction f (quote t1) 10000)
+(transaction f (quote t2) 20000)
+(transaction f (quote t2) 10002)
+(run-transactions)
+(transaction f (quote t2) 15)
+(transaction f (quote t2) 15)
+(run-transactions)
+(print (quote result) (get c))
+(print (quote finished))
+
diff --git a/duhton/demo/sort.duh b/duhton/demo/sort.duh
--- a/duhton/demo/sort.duh
+++ b/duhton/demo/sort.duh
@@ -28,14 +28,13 @@
 
 
 (defun random_list (n)
-  (if (> n 0)
-      (progn
-        (setq res (random_list (- n 1)))
-        (append res (% (xor128) 10))
-        res
-        )
-    (list )
+  (setq i n)
+  (setq res (list))
+  (while (> i 0)
+    (append res (% (xor128) 10))
+    (setq i (- i 1))
     )
+  res
   )
 
 
@@ -62,33 +61,21 @@
   )
 
 
-(defun append_to_correct_half (xs first second half_len)
-  (if (< 0 (len xs))
-      (progn
-        (setq elem (pop xs 0))
-        (if (< 0 half_len)
-            (append_to_correct_half xs
-                                    (append first elem)
-                                    second
-                                    (- half_len 1))
-          
-          (append_to_correct_half xs
-                                  first
-                                  (append second elem)
-                                  (- half_len 1))
-          )
-        )
-    )
-  )
 
 (defun split_list (xs)
   ;; empties xs and fills 2 new lists to be returned
   (setq half_len (/ (len xs) 2))
-
   (setq first (list))
   (setq second (list))
 
-  (append_to_correct_half xs first second half_len)
+  (while (< 0 (len xs))
+    (if (< 0 half_len)
+        (append first (pop xs 0))
+      (append second (pop xs 0))
+      )
+    (setq half_len (- half_len 1))
+    )
+
   (list first second)
   )
 
@@ -110,19 +97,16 @@
   )
 
 
-(defun copy_list_helper (xs res idx)
-  (if (< idx (len xs))
-      (progn
-        (append res (get xs idx))
-        (copy_list_helper xs res (+ idx 1))
-        )
-    )
-  )
 (defun copy_list (xs)
   (setq res (list))
-  (copy_list_helper xs res 0)
+  (setq idx 0)
+  (while (< idx (len xs))
+    (append res (get xs idx))
+    (setq idx (+ idx 1))
+    )
   res
   )
+
 (defun print_list (xs)
   (print (quote len:) (len xs) (quote ->) xs)
   )
@@ -130,11 +114,11 @@
 
 
 
-(setq as (random_list 20))
-(setq bs (random_list 20))
-(print as)
-(print bs)
-(print (split_list as))
+;; (setq as (random_list 20))
+;; (setq bs (random_list 20))
+;; (print as)
+;; (print bs)
+;; (print (split_list as))
 
 (setq cs (random_list 10000))
 (print_list cs)
diff --git a/duhton/duhton.c b/duhton/duhton.c
--- a/duhton/duhton.c
+++ b/duhton/duhton.c
@@ -42,7 +42,7 @@
             printf("))) ");
             fflush(stdout);
         }
-        stm_start_transaction(NULL);
+        stm_start_inevitable_transaction();
         DuObject *code = Du_Compile(filename, interactive);
         _du_save1(code);
         stm_stop_transaction();
@@ -53,7 +53,7 @@
         }
         /*Du_Print(code, 1);
           printf("\n");*/
-        stm_start_transaction(NULL);
+        stm_start_inevitable_transaction();
         DuObject *res = Du_Eval(code, Du_Globals);
         if (interactive) {
             Du_Print(res, 1);
diff --git a/duhton/glob.c b/duhton/glob.c
--- a/duhton/glob.c
+++ b/duhton/glob.c
@@ -771,7 +771,7 @@
     all_threads_count = num_threads;
     all_threads = (pthread_t*)malloc(sizeof(pthread_t) * num_threads);
 
-    stm_start_transaction(NULL);
+    stm_start_inevitable_transaction();
     DuFrame_SetBuiltinMacro(Du_Globals, "progn", Du_Progn);
     DuFrame_SetBuiltinMacro(Du_Globals, "setq", du_setq);
     DuFrame_SetBuiltinMacro(Du_Globals, "print", du_print);
diff --git a/duhton/transaction.c b/duhton/transaction.c
--- a/duhton/transaction.c
+++ b/duhton/transaction.c
@@ -61,7 +61,7 @@
     if (stm_thread_local_obj == NULL)
         return;
 
-    stm_start_transaction(NULL);
+    stm_start_inevitable_transaction();
     DuConsObject *root = du_pending_transactions;
     _du_write1(root);
     root->cdr = stm_thread_local_obj;
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to