Author: Armin Rigo <[email protected]>
Branch: bag
Changeset: r1582:21b4163950f8
Date: 2015-01-24 11:54 +0100
http://bitbucket.org/pypy/stmgc/changeset/21b4163950f8/

Log:    in-progress

diff --git a/c7/stm/bag.c b/c7/stm/bag.c
--- a/c7/stm/bag.c
+++ b/c7/stm/bag.c
@@ -44,6 +44,7 @@
 struct stm_bag_seg_s {
     uintptr_t *deque_left, *deque_middle, *deque_right;
     struct list_s *abort_list;
+    uint64_t start_time;    /* the transaction's unique_start_time */
 };
 
 struct stm_bag_s {
@@ -62,6 +63,7 @@
         bs->deque_middle = &block->items[0];
         bs->deque_right = &block->items[0];
         LIST_CREATE(bs->abort_list);
+        bs->start_time = 0;
     }
     return bag;
 }
@@ -79,41 +81,105 @@
         }
         LIST_FREE(bs->abort_list);
     }
+
+    s_mutex_lock();
+    for (i = 0; i < STM_NB_SEGMENTS; i++) {
+        struct stm_bag_seg_s *bs = &bag->by_segment[i];
+        struct stm_segment_info_s *pub = get_segment(i + 1);
+        stm_thread_local_t *tl = pub->running_thread;
+        if (tl->associated_segment_num == i + 1) {
+            stm_call_on_abort(tl, bs, NULL);
+        }
+    }
+    s_mutex_unlock();
+
+    free(bag);
+}
+
+static void bag_add(struct stm_bag_seg_s *bs, object_t *newobj)
+{
+    struct deque_block_s *block = deque_block(bs->deque_right);
+    *bs->deque_right++ = (uintptr_t)newobj;
+
+    if (bs->deque_right == &block->items[DEQUE_BLOCK_SIZE]) {
+        if (block->next == NULL)
+            block->next = deque_new_block();
+        bs->deque_right = &block->next->items[0];
+    }
+}
+
+static void bag_abort_callback(void *key)
+{
+    struct stm_bag_seg_s *bs = (struct stm_bag_seg_s *)key;
+
+    /* remove the "added in this transaction" items */
+    bs->deque_right = bs->deque_middle;
+
+    /* reinstall the items from the "abort_list" */
+    LIST_FOREACH_F(bs->abort_list, object_t *, bag_add(bs, item));
+    list_clear(bs->abort_list);
+
+    /* these items are not "added in this transaction" */
+    bs->deque_middle = bs->deque_right;
+}
+
+static struct stm_bag_seg_s *bag_check_start_time(stm_bag_t *bag)
+{
+    int i = STM_SEGMENT->segment_num - 1;
+    struct stm_bag_seg_s *bs = &bag->by_segment[i];
+
+    if (bs->start_time != STM_PSEGMENT->unique_start_time) {
+        /* There was a commit or an abort since the last operation
+           on the same bag in the same segment.  If there was an
+           abort, bag_abort_callback() should have been called to
+           reset the state.  Assume that any non-reset state is
+           there because of a commit.
+
+           The middle pointer moves to the right: there are no
+           more "added in this transaction" entries.  And the
+           "already popped items" list is forgotten.
+        */
+        bs->deque_middle = bs->deque_right;
+        list_clear(bs->abort_list);
+        bs->start_time = STM_PSEGMENT->unique_start_time;
+
+        /* We're about to modify the bag, so register an abort
+           callback now. */
+        stm_thread_local_t *tl = STM_SEGMENT->running_thread;
+        assert(tl->associated_segment_num == STM_SEGMENT->segment_num);
+        stm_call_on_abort(tl, bs, &bag_abort_callback);
+    }
+
+    return bs;
 }
 
 void stm_bag_add(stm_bag_t *bag, object_t *newobj)
 {
-    int i = STM_SEGMENT->segment_num - 1;
-    struct stm_bag_seg_s *bs = &bag->by_segment[i];
-    struct deque_block_s *block = deque_block(bs->deque_right);
-
-    *bs->deque_right++ = (uintptr_t)newobj;
-
-    if (bs->deque_right == &block->items[DEQUE_BLOCK_SIZE]) {
-        assert(block->next == NULL);
-        block->next = deque_new_block();
-        bs->deque_right = &block->next->items[0];
-    }
+    struct stm_bag_seg_s *bs = bag_check_start_time(bag);
+    bag_add(bs, newobj);
 }
 
 object_t *stm_bag_try_pop(stm_bag_t *bag)
 {
-    int i = STM_SEGMENT->segment_num - 1;
-    struct stm_bag_seg_s *bs = &bag->by_segment[i];
+    struct stm_bag_seg_s *bs = bag_check_start_time(bag);
     if (bs->deque_left == bs->deque_right) {
         return NULL;
     }
+
     struct deque_block_s *block = deque_block(bs->deque_left);
-    bool any_old_item_to_pop = (bs->deque_left != bs->deque_middle);
+    bool from_same_transaction = (bs->deque_left == bs->deque_middle);
     uintptr_t result = *bs->deque_left++;
 
     if (bs->deque_left == &block->items[DEQUE_BLOCK_SIZE]) {
         bs->deque_left = &block->next->items[0];
         deque_free_block(block);
     }
-    if (!any_old_item_to_pop) {
+    if (from_same_transaction) {
         bs->deque_middle = bs->deque_left;
     }
+    else {
+        LIST_APPEND(bs->abort_list, result);
+    }
     return (object_t *)result;
 }
 
diff --git a/c7/stm/contention.c b/c7/stm/contention.c
--- a/c7/stm/contention.c
+++ b/c7/stm/contention.c
@@ -73,7 +73,7 @@
 __attribute__((unused))
 static void cm_abort_the_younger(struct contmgr_s *cm)
 {
-    if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+    if (STM_PSEGMENT->unique_start_time >= cm->other_pseg->unique_start_time) {
         /* We started after the other thread.  Abort */
         cm->abort_other = false;
     }
@@ -100,7 +100,7 @@
 __attribute__((unused))
 static void cm_pause_if_younger(struct contmgr_s *cm)
 {
-    if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+    if (STM_PSEGMENT->unique_start_time >= cm->other_pseg->unique_start_time) {
         /* We started after the other thread.  Pause */
         cm->try_sleep = true;
         cm->abort_other = false;
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -324,7 +324,7 @@
     STM_SEGMENT->transaction_read_version = 1;
 }
 
-static uint64_t _global_start_time = 0;
+static uint64_t _global_start_time = 1;
 
 static void _stm_start_transaction(stm_thread_local_t *tl)
 {
@@ -337,7 +337,7 @@
     assert(STM_PSEGMENT->safe_point == SP_NO_TRANSACTION);
     assert(STM_PSEGMENT->transaction_state == TS_NONE);
     timing_event(tl, STM_TRANSACTION_START);
-    STM_PSEGMENT->start_time = _global_start_time++;
+    STM_PSEGMENT->unique_start_time = _global_start_time++;
     STM_PSEGMENT->signalled_to_commit_soon = false;
     STM_PSEGMENT->safe_point = SP_RUNNING;
     STM_PSEGMENT->marker_inev.object = NULL;
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -138,7 +138,7 @@
 
     /* Start time: to know approximately for how long a transaction has
        been running, in contention management */
-    uint64_t start_time;
+    uint64_t unique_start_time;
 
     /* This is the number stored in the overflowed objects (a multiple of
        GCFLAG_OVERFLOW_NUMBER_bit0).  It is incremented when the
diff --git a/c7/stm/list.h b/c7/stm/list.h
--- a/c7/stm/list.h
+++ b/c7/stm/list.h
@@ -96,6 +96,16 @@
         }                                       \
     } while (0)
 
+#define LIST_FOREACH_F(lst, TYPE, CODE)         \
+    do {                                        \
+        struct list_s *_lst = (lst);            \
+        uintptr_t _i, _c = _lst->count;         \
+        for (_i = 0; _i < _c; _i++) {           \
+            TYPE item = (TYPE)_lst->items[_i];  \
+            CODE;                               \
+        }                                       \
+    } while (0)
+
 /************************************************************/
 
 /* The tree_xx functions are, like the name hints, implemented as a tree,
diff --git a/c7/test/test_bag.py b/c7/test/test_bag.py
--- a/c7/test/test_bag.py
+++ b/c7/test/test_bag.py
@@ -131,3 +131,31 @@
         assert got == lp1
         #
         stm_major_collect()       # to get rid of the bag object
+
+    def test_abort_recovers_popped(self):
+        self.start_transaction()
+        q = self.allocate_bag()
+        self.push_root(q)
+        lp1 = stm_allocate(16)
+        lp2 = stm_allocate(16)
+        stm_set_char(lp1, 'M')
+        stm_set_char(lp2, 'N')
+        b_add(q, lp1)
+        b_add(q, lp2)
+        self.commit_transaction()
+        q = self.pop_root()
+        #
+        self.start_transaction()
+        lp1 = b_pop(q)
+        lp2 = b_pop(q)
+        assert stm_get_char(lp1) == 'M'
+        assert stm_get_char(lp2) == 'N'
+        self.abort_transaction()
+        #
+        self.start_transaction()
+        lp1 = b_pop(q)
+        lp2 = b_pop(q)
+        assert stm_get_char(lp1) == 'M'
+        assert stm_get_char(lp2) == 'N'
+        #
+        stm_major_collect()       # to get rid of the bag object
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to