Author: Remi Meier
Branch: c7
Changeset: r687:58ad71386b61
Date: 2014-01-29 10:36 +0100
http://bitbucket.org/pypy/stmgc/changeset/58ad71386b61/
Log: implement inevitable transactions
diff --git a/c7/core.c b/c7/core.c
--- a/c7/core.c
+++ b/c7/core.c
@@ -22,7 +22,7 @@
char *object_pages;
static int num_threads_started;
uint8_t write_locks[READMARKER_END - READMARKER_START];
-
+volatile uint8_t inevitable_lock;
struct _thread_local1_s* _stm_dbg_get_tl(int thread)
{
@@ -125,6 +125,7 @@
uintptr_t lock_idx = (((uintptr_t)obj) >> 4) - READMARKER_START;
uint8_t lock_num = _STM_TL->thread_num + 1;
uint8_t prev_owner;
+ retry:
do {
prev_owner = __sync_val_compare_and_swap(&write_locks[lock_idx],
0, lock_num);
@@ -132,7 +133,16 @@
/* if there was no lock-holder or we already have the lock */
if ((!prev_owner) || (prev_owner == lock_num))
break;
-
+
+ if (_STM_TL->active == 2) {
+ /* we must succeed! */
+ _stm_dbg_get_tl(prev_owner - 1)->need_abort = 1;
+ _stm_start_no_collect_safe_point();
+ /* XXX: not good */
+ usleep(1);
+ _stm_stop_no_collect_safe_point();
+ goto retry;
+ }
/* XXXXXX */
//_stm_start_semi_safe_point();
//usleep(1);
@@ -161,6 +171,8 @@
{
_stm_reset_shared_lock();
_stm_reset_pages();
+
+ inevitable_lock = 0;
/* Check that some values are acceptable */
assert(4096 <= ((uintptr_t)_STM_TL));
@@ -259,12 +271,12 @@
_STM_TL->modified_objects = stm_list_create();
_STM_TL->uncommitted_objects = stm_list_create();
- assert(!_STM_TL->running_transaction);
+ assert(!_STM_TL->active);
}
bool _stm_is_in_transaction(void)
{
- return _STM_TL->running_transaction;
+ return _STM_TL->active;
}
void _stm_teardown_thread(void)
@@ -288,6 +300,7 @@
void _stm_teardown(void)
{
+ assert(inevitable_lock == 0);
munmap(object_pages, TOTAL_MEMORY);
_stm_reset_pages();
memset(write_locks, 0, sizeof(write_locks));
@@ -330,9 +343,46 @@
}
+void stm_become_inevitable(char* msg)
+{
+ if (_STM_TL->active == 2)
+ return;
+ assert(_STM_TL->active == 1);
+
+ uint8_t our_lock = _STM_TL->thread_num + 1;
+ do {
+ _stm_start_safe_point();
+
+ stm_start_exclusive_lock();
+ if (_STM_TL->need_abort) {
+ stm_stop_exclusive_lock();
+ stm_start_shared_lock();
+ stm_abort_transaction();
+ }
+
+ if (!inevitable_lock)
+ break;
+
+ stm_stop_exclusive_lock();
+ _stm_stop_safe_point();
+ } while (1);
+
+ inevitable_lock = our_lock;
+ _STM_TL->active = 2;
+ stm_stop_exclusive_lock();
+
+ _stm_stop_safe_point();
+}
+
+void stm_start_inevitable_transaction()
+{
+ stm_start_transaction(NULL);
+ stm_become_inevitable("stm_start_inevitable_transaction");
+}
+
void stm_start_transaction(jmpbufptr_t *jmpbufptr)
{
- assert(!_STM_TL->running_transaction);
+ assert(!_STM_TL->active);
stm_start_shared_lock();
@@ -346,14 +396,14 @@
nursery_on_start();
_STM_TL->jmpbufptr = jmpbufptr;
- _STM_TL->running_transaction = 1;
+ _STM_TL->active = 1;
_STM_TL->need_abort = 0;
}
void stm_stop_transaction(void)
{
- assert(_STM_TL->running_transaction);
+ assert(_STM_TL->active);
/* do the minor_collection here and not in nursery_on_commit,
since here we can still run concurrently with other threads
@@ -361,8 +411,32 @@
_stm_minor_collect();
/* Some operations require us to have the EXCLUSIVE lock */
- stm_stop_shared_lock();
- stm_start_exclusive_lock();
+ if (_STM_TL->active == 1) {
+ while (1) {
+ _stm_start_safe_point();
+ usleep(1); /* XXX: better algorithm that allows
+ for waiting on a mutex */
+ stm_start_exclusive_lock();
+ if (_STM_TL->need_abort) {
+ stm_stop_exclusive_lock();
+ stm_start_shared_lock();
+ stm_abort_transaction();
+ }
+
+ if (!inevitable_lock)
+ break;
+ stm_stop_exclusive_lock();
+ _stm_stop_safe_point();
+ }
+ /* we have the exclusive lock */
+ } else {
+ /* inevitable! no other transaction could have committed
+ or aborted us */
+ stm_stop_shared_lock();
+ stm_start_exclusive_lock();
+ assert(!_STM_TL->need_abort);
+ inevitable_lock = 0;
+ }
_STM_TL->jmpbufptr = NULL; /* cannot abort any more */
@@ -374,7 +448,7 @@
stm_list_clear(_STM_TL->modified_objects);
- _STM_TL->running_transaction = 0;
+ _STM_TL->active = 0;
stm_stop_exclusive_lock();
fprintf(stderr, "%c", 'C'+_STM_TL->thread_num*32);
}
@@ -421,13 +495,13 @@
void stm_abort_transaction(void)
{
/* here we hold the shared lock as a reader or writer */
- assert(_STM_TL->running_transaction);
+ assert(_STM_TL->active == 1);
nursery_on_abort();
assert(_STM_TL->jmpbufptr != NULL);
assert(_STM_TL->jmpbufptr != (jmpbufptr_t *)-1); /* for tests only */
- _STM_TL->running_transaction = 0;
+ _STM_TL->active = 0;
stm_stop_shared_lock();
fprintf(stderr, "%c", 'A'+_STM_TL->thread_num*32);
diff --git a/c7/core.h b/c7/core.h
--- a/c7/core.h
+++ b/c7/core.h
@@ -44,8 +44,6 @@
-
-
#define TLPREFIX __attribute__((address_space(256)))
typedef TLPREFIX struct _thread_local1_s _thread_local1_t;
@@ -94,7 +92,7 @@
uint8_t transaction_read_version;
int thread_num;
- bool running_transaction;
+ uint8_t active; /* 1 normal, 2 inevitable, 0 no trans. */
bool need_abort;
char *thread_base;
struct stm_list_s *modified_objects;
@@ -226,8 +224,8 @@
void stm_abort_transaction(void);
void _stm_minor_collect();
-#define stm_become_inevitable(msg) /* XXX implement me! */
-#define stm_start_inevitable_transaction() stm_start_transaction(NULL) /*
XXX implement me! */
+void stm_become_inevitable(char* msg);
+void stm_start_inevitable_transaction();
struct _thread_local1_s* _stm_dbg_get_tl(int thread); /* -1 is current thread
*/
diff --git a/c7/nursery.c b/c7/nursery.c
--- a/c7/nursery.c
+++ b/c7/nursery.c
@@ -19,7 +19,7 @@
void stm_major_collection(void)
{
- assert(_STM_TL->running_transaction);
+ assert(_STM_TL->active);
abort();
}
@@ -190,7 +190,7 @@
{
_stm_start_safe_point();
_stm_stop_safe_point();
- assert(_STM_TL->running_transaction);
+ assert(_STM_TL->active);
assert(size % 8 == 0);
assert(16 <= size && size < NB_NURSERY_PAGES * 4096);//XXX
diff --git a/c7/stmsync.h b/c7/stmsync.h
--- a/c7/stmsync.h
+++ b/c7/stmsync.h
@@ -6,6 +6,9 @@
void stm_start_exclusive_lock(void);
void _stm_start_safe_point(void);
void _stm_stop_safe_point(void);
-void _stm_reset_shared_lock();
+void _stm_reset_shared_lock(void);
+/* XXX: major collections must not be possible here: */
+#define _stm_start_no_collect_safe_point(void) _stm_start_safe_point()
+#define _stm_stop_no_collect_safe_point(void) _stm_stop_safe_point()
diff --git a/c7/test/support.py b/c7/test/support.py
--- a/c7/test/support.py
+++ b/c7/test/support.py
@@ -108,6 +108,10 @@
size_t _stm_data_size(struct object_s *data);
void _stm_chunk_pages(struct object_s *data, uintptr_t *start, uintptr_t *num);
+void stm_become_inevitable(char* msg);
+void stm_start_inevitable_transaction();
+bool _checked_stm_become_inevitable();
+
""")
lib = ffi.verify('''
@@ -137,6 +141,19 @@
}
+bool _checked_stm_become_inevitable() {
+ jmpbufptr_t here;
+ if (__builtin_setjmp(here) == 0) { // returned directly
+ assert(_STM_TL->jmpbufptr == (jmpbufptr_t*)-1);
+ _STM_TL->jmpbufptr = &here;
+ stm_become_inevitable("TEST");
+ _STM_TL->jmpbufptr = (jmpbufptr_t*)-1;
+ return 0;
+ }
+ _STM_TL->jmpbufptr = (jmpbufptr_t*)-1;
+ return 1;
+}
+
bool _checked_stm_write(object_t *object) {
jmpbufptr_t here;
if (__builtin_setjmp(here) == 0) { // returned directly
@@ -349,6 +366,10 @@
if lib._stm_check_stop_safe_point():
raise Conflict()
+def stm_become_inevitable():
+ if lib._checked_stm_become_inevitable():
+ raise Conflict()
+
def stm_minor_collect():
lib._stm_minor_collect()
diff --git a/c7/test/test_basic.py b/c7/test/test_basic.py
--- a/c7/test/test_basic.py
+++ b/c7/test/test_basic.py
@@ -473,6 +473,25 @@
stm_stop_transaction()
+ def test_inevitable_transaction(self):
+ py.test.skip("stm_write and stm_stop_transaction"
+ " of an inevitable tr. is not testable"
+ " since they wait for the other thread"
+ " to synchronize and possibly abort")
+
+ old = stm_allocate_old(16)
+ stm_start_transaction()
+
+ self.switch(1)
+ stm_start_transaction()
+ stm_write(old)
+
+ self.switch(0)
+ stm_become_inevitable()
+ stm_write(old) # t1 needs to abort, not us
+ stm_stop_transaction()
+
+ py.test.raises(Conflict, self.switch, 1)
# def test_resolve_write_write_no_conflict(self):
# stm_start_transaction()
diff --git a/duhton/demo/run_transactions.duh b/duhton/demo/run_transactions.duh
new file mode 100644
--- /dev/null
+++ b/duhton/demo/run_transactions.duh
@@ -0,0 +1,25 @@
+
+(setq c (container 0))
+
+(defun g (n)
+ (setq i n)
+ (while (< 0 i)
+ (set c (+ (get c) 1))
+ (setq i (- i 1))
+ )
+ )
+
+(defun f (thread n)
+ (g n)
+ )
+
+(transaction f (quote t1) 10000)
+(transaction f (quote t2) 20000)
+(transaction f (quote t2) 10002)
+(run-transactions)
+(transaction f (quote t2) 15)
+(transaction f (quote t2) 15)
+(run-transactions)
+(print (quote result) (get c))
+(print (quote finished))
+
diff --git a/duhton/demo/sort.duh b/duhton/demo/sort.duh
--- a/duhton/demo/sort.duh
+++ b/duhton/demo/sort.duh
@@ -28,14 +28,13 @@
(defun random_list (n)
- (if (> n 0)
- (progn
- (setq res (random_list (- n 1)))
- (append res (% (xor128) 10))
- res
- )
- (list )
+ (setq i n)
+ (setq res (list))
+ (while (> i 0)
+ (append res (% (xor128) 10))
+ (setq i (- i 1))
)
+ res
)
@@ -62,33 +61,21 @@
)
-(defun append_to_correct_half (xs first second half_len)
- (if (< 0 (len xs))
- (progn
- (setq elem (pop xs 0))
- (if (< 0 half_len)
- (append_to_correct_half xs
- (append first elem)
- second
- (- half_len 1))
-
- (append_to_correct_half xs
- first
- (append second elem)
- (- half_len 1))
- )
- )
- )
- )
(defun split_list (xs)
;; empties xs and fills 2 new lists to be returned
(setq half_len (/ (len xs) 2))
-
(setq first (list))
(setq second (list))
- (append_to_correct_half xs first second half_len)
+ (while (< 0 (len xs))
+ (if (< 0 half_len)
+ (append first (pop xs 0))
+ (append second (pop xs 0))
+ )
+ (setq half_len (- half_len 1))
+ )
+
(list first second)
)
@@ -110,19 +97,16 @@
)
-(defun copy_list_helper (xs res idx)
- (if (< idx (len xs))
- (progn
- (append res (get xs idx))
- (copy_list_helper xs res (+ idx 1))
- )
- )
- )
(defun copy_list (xs)
(setq res (list))
- (copy_list_helper xs res 0)
+ (setq idx 0)
+ (while (< idx (len xs))
+ (append res (get xs idx))
+ (setq idx (+ idx 1))
+ )
res
)
+
(defun print_list (xs)
(print (quote len:) (len xs) (quote ->) xs)
)
@@ -130,11 +114,11 @@
-(setq as (random_list 20))
-(setq bs (random_list 20))
-(print as)
-(print bs)
-(print (split_list as))
+;; (setq as (random_list 20))
+;; (setq bs (random_list 20))
+;; (print as)
+;; (print bs)
+;; (print (split_list as))
(setq cs (random_list 10000))
(print_list cs)
diff --git a/duhton/duhton.c b/duhton/duhton.c
--- a/duhton/duhton.c
+++ b/duhton/duhton.c
@@ -42,7 +42,7 @@
printf("))) ");
fflush(stdout);
}
- stm_start_transaction(NULL);
+ stm_start_inevitable_transaction();
DuObject *code = Du_Compile(filename, interactive);
_du_save1(code);
stm_stop_transaction();
@@ -53,7 +53,7 @@
}
/*Du_Print(code, 1);
printf("\n");*/
- stm_start_transaction(NULL);
+ stm_start_inevitable_transaction();
DuObject *res = Du_Eval(code, Du_Globals);
if (interactive) {
Du_Print(res, 1);
diff --git a/duhton/glob.c b/duhton/glob.c
--- a/duhton/glob.c
+++ b/duhton/glob.c
@@ -771,7 +771,7 @@
all_threads_count = num_threads;
all_threads = (pthread_t*)malloc(sizeof(pthread_t) * num_threads);
- stm_start_transaction(NULL);
+ stm_start_inevitable_transaction();
DuFrame_SetBuiltinMacro(Du_Globals, "progn", Du_Progn);
DuFrame_SetBuiltinMacro(Du_Globals, "setq", du_setq);
DuFrame_SetBuiltinMacro(Du_Globals, "print", du_print);
diff --git a/duhton/transaction.c b/duhton/transaction.c
--- a/duhton/transaction.c
+++ b/duhton/transaction.c
@@ -61,7 +61,7 @@
if (stm_thread_local_obj == NULL)
return;
- stm_start_transaction(NULL);
+ stm_start_inevitable_transaction();
DuConsObject *root = du_pending_transactions;
_du_write1(root);
root->cdr = stm_thread_local_obj;
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit