Author: Remi Meier <[email protected]>
Branch: parallel-pulling
Changeset: r1252:a68ae11d76fa
Date: 2014-06-19 13:46 +0200
http://bitbucket.org/pypy/stmgc/changeset/a68ae11d76fa/

Log:    try the most naive way to parallelize commit pushing

diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -225,6 +225,7 @@
     dprintf(("start_transaction\n"));
 
     s_mutex_unlock();
+    pull_committed_changes();
 
     /* Now running the SP_RUNNING start.  We can set our
        'transaction_read_version' after releasing the mutex,
@@ -355,7 +356,7 @@
     }
 }
 
-static void synchronize_object_now(object_t *obj)
+static void synchronize_object_now(object_t *obj, bool lazy_on_commit)
 {
     /* Copy around the version of 'obj' that lives in our own segment.
        It is first copied into the shared pages, and then into other
@@ -381,6 +382,12 @@
         uintptr_t last_page = (end - 1) / 4096UL;
         long i, myself = STM_SEGMENT->segment_num;
 
+        bool private_in_segment[NB_SEGMENTS];
+        if (lazy_on_commit) {
+            for (i = 1; i <= NB_SEGMENTS; i++)
+                private_in_segment[i-1] = false;
+        }
+
         for (; first_page <= last_page; first_page++) {
 
             uintptr_t copy_size;
@@ -410,20 +417,25 @@
                 assert(memcmp(dst, src, copy_size) == 0);  /* same page */
             }
 
+            /* now copy from the shared page to all private pages */
+            src = REAL_ADDRESS(stm_object_pages, start);
             for (i = 1; i <= NB_SEGMENTS; i++) {
                 if (i == myself)
                     continue;
 
-                src = REAL_ADDRESS(stm_object_pages, start);
                 dst = REAL_ADDRESS(get_segment_base(i), start);
                 if (is_private_page(i, first_page)) {
                     /* The page is a private page.  We need to diffuse this
                        fragment of object from the shared page to this private
                        page. */
-                    if (copy_size == 4096)
-                        pagecopy(dst, src);
-                    else
-                        memcpy(dst, src, copy_size);
+                    if (!lazy_on_commit) {
+                        if (copy_size == 4096)
+                            pagecopy(dst, src);
+                        else
+                            memcpy(dst, src, copy_size);
+                    }
+
+                    private_in_segment[i-1] = true;
                 }
                 else {
                     assert(!memcmp(dst, src, copy_size));  /* same page */
@@ -432,6 +444,15 @@
 
             start = (start + 4096) & ~4095;
         }
+
+        if (lazy_on_commit) {
+            for (i = 1; i <= NB_SEGMENTS; i++) {
+                if (private_in_segment[i-1]) {
+                    struct stm_priv_segment_info_s *pseg = get_priv_segment(i);
+                    LIST_APPEND(pseg->outdated_objects, obj);
+                }
+            }
+        }
     }
 }
 
@@ -442,7 +463,7 @@
 
     acquire_privatization_lock();
     LIST_FOREACH_R(STM_PSEGMENT->large_overflow_objects, object_t *,
-                   synchronize_object_now(item));
+                   synchronize_object_now(item, false));
     release_privatization_lock();
 }
 
@@ -466,7 +487,7 @@
 
             /* copy the object to the shared page, and to the other
                private pages as needed */
-            synchronize_object_now(item);
+            synchronize_object_now(item, true);
         }));
     release_privatization_lock();
 
@@ -559,6 +580,7 @@
     /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
 
     s_mutex_unlock();
+    pull_committed_changes();
 }
 
 void stm_abort_transaction(void)
@@ -567,6 +589,41 @@
     abort_with_mutex();
 }
 
+static void copy_objs_from_segment_0(int segment_num, struct list_s *lst)
+{
+    /* pull the list of objects from segment 0. This either resets
+       modifications or just updates the view of the current segment.
+    */
+    char *local_base = get_segment_base(segment_num);
+    char *zero_base = get_segment_base(0);
+
+    LIST_FOREACH_R(lst, object_t * /*item*/,
+        ({
+            /* memcpy in the opposite direction than
+               push_modified_to_other_segments() */
+            char *src = REAL_ADDRESS(zero_base, item);
+            char *dst = REAL_ADDRESS(local_base, item);
+            ssize_t size = stmcb_size_rounded_up((struct object_s *)src);
+            memcpy(dst, src, size);
+
+            /* all objs in segment 0 should have the WB flag: */
+            assert(((struct object_s *)dst)->stm_flags & GCFLAG_WRITE_BARRIER);
+        }));
+    write_fence();
+}
+
+static void pull_committed_changes()
+{
+    struct list_s *lst = STM_PSEGMENT->outdated_objects;
+
+    if (list_count(lst)) {
+        dprintf(("pulling %lu objects from shared segment\n", 
list_count(lst)));
+        copy_objs_from_segment_0(STM_SEGMENT->segment_num, lst);
+        list_clear(lst);
+    }
+}
+
+
 static void
 reset_modified_from_other_segments(int segment_num)
 {
@@ -723,6 +780,7 @@
 {
     s_mutex_lock();
     enter_safe_point_if_requested();
+    pull_committed_changes();   /* XXX: not sure if necessary */
 
     if (STM_PSEGMENT->transaction_state == TS_REGULAR) {
         dprintf(("become_inevitable: %s\n", msg));
@@ -739,6 +797,7 @@
     }
 
     s_mutex_unlock();
+    pull_committed_changes();
 }
 
 void stm_become_globally_unique_transaction(stm_thread_local_t *tl,
@@ -749,4 +808,5 @@
     s_mutex_lock();
     synchronize_all_threads(STOP_OTHERS_AND_BECOME_GLOBALLY_UNIQUE);
     s_mutex_unlock();
+    pull_committed_changes();
 }
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -75,6 +75,10 @@
 struct stm_priv_segment_info_s {
     struct stm_segment_info_s pub;
 
+    /* list of objects that were committed while we waited in a
+       safe point. This means we have an outdated copy of them. */
+    struct list_s *outdated_objects;
+
     /* List of old objects (older than the current transaction) that the
        current transaction attempts to modify.  This is used to track
        the STM status: they are old objects that where written to and
@@ -257,7 +261,8 @@
 }
 
 static void copy_object_to_shared(object_t *obj, int source_segment_num);
-static void synchronize_object_now(object_t *obj);
+static void synchronize_object_now(object_t *obj, bool lazy_on_commit);
+static void pull_committed_changes();
 
 static inline void acquire_privatization_lock(void)
 {
diff --git a/c7/stm/forksupport.c b/c7/stm/forksupport.c
--- a/c7/stm/forksupport.c
+++ b/c7/stm/forksupport.c
@@ -66,6 +66,7 @@
 
     s_mutex_lock();
     synchronize_all_threads(STOP_OTHERS_UNTIL_MUTEX_UNLOCK);
+    pull_committed_changes();   /* XXX: unclear if necessary */
 
     /* Make a new mmap at some other address, but of the same size as
        the standard mmap at stm_object_pages
diff --git a/c7/stm/gcpage.c b/c7/stm/gcpage.c
--- a/c7/stm/gcpage.c
+++ b/c7/stm/gcpage.c
@@ -153,6 +153,7 @@
     }
 
     s_mutex_unlock();
+    pull_committed_changes();
 }
 
 
diff --git a/c7/stm/nursery.c b/c7/stm/nursery.c
--- a/c7/stm/nursery.c
+++ b/c7/stm/nursery.c
@@ -216,7 +216,7 @@
             */
             if (STM_PSEGMENT->minor_collect_will_commit_now) {
                 acquire_privatization_lock();
-                synchronize_object_now(obj);
+                synchronize_object_now(obj, false);
                 release_privatization_lock();
             } else {
                 LIST_APPEND(STM_PSEGMENT->large_overflow_objects, obj);
diff --git a/c7/stm/setup.c b/c7/stm/setup.c
--- a/c7/stm/setup.c
+++ b/c7/stm/setup.c
@@ -118,6 +118,7 @@
         pr->pub.segment_base = segment_base;
         pr->objects_pointing_to_nursery = NULL;
         pr->large_overflow_objects = NULL;
+        pr->outdated_objects = list_create();
         pr->modified_old_objects = list_create();
         pr->modified_old_objects_markers = list_create();
         pr->young_weakrefs = list_create();
@@ -157,6 +158,7 @@
         struct stm_priv_segment_info_s *pr = get_priv_segment(i);
         assert(pr->objects_pointing_to_nursery == NULL);
         assert(pr->large_overflow_objects == NULL);
+        list_free(pr->outdated_objects);
         list_free(pr->modified_old_objects);
         list_free(pr->modified_old_objects_markers);
         list_free(pr->young_weakrefs);
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -255,6 +255,7 @@
     STM_PSEGMENT->safe_point = SP_RUNNING;
 
     stm_safe_point();
+    pull_committed_changes();
 }
 #endif
 
@@ -439,4 +440,5 @@
     s_mutex_lock();
     enter_safe_point_if_requested();
     s_mutex_unlock();
+    pull_committed_changes();
 }
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to