Author: Armin Rigo <[email protected]>
Branch: concurrent-marksweep
Changeset: r47850:991e521500e4
Date: 2011-10-07 06:49 +0200
http://bitbucket.org/pypy/pypy/changeset/991e521500e4/

Log:    Marking.

diff --git a/pypy/rpython/lltypesystem/llarena.py 
b/pypy/rpython/lltypesystem/llarena.py
--- a/pypy/rpython/lltypesystem/llarena.py
+++ b/pypy/rpython/lltypesystem/llarena.py
@@ -77,8 +77,20 @@
         bytes = llmemory.raw_malloc_usage(size)
         if offset + bytes > self.nbytes:
             raise ArenaError("object overflows beyond the end of the arena")
+        # common case: 'size' starts with a GCHeaderOffset.  In this case
+        # we allocate with 'zero' even if the header doesn't have null
+        # bytes, as long as the rest of the object is over null bytes.
+        sz1 = size
+        while isinstance(sz1, RoundedUpForAllocation):
+            sz1 = sz1.basesize
+        if (isinstance(sz1, llmemory.CompositeOffset) and
+            isinstance(sz1.offsets[0], llmemory.GCHeaderOffset)):
+            hdrbytes = llmemory.raw_malloc_usage(sz1.offsets[0])
+            assert bytes >= hdrbytes
+        else:
+            hdrbytes = 0
         zero = True
-        for c in self.usagemap[offset:offset+bytes]:
+        for c in self.usagemap[offset+hdrbytes:offset+bytes]:
             if c == '0':
                 pass
             elif c == '#':
@@ -90,14 +102,10 @@
         pattern = letter.upper() + letter*(bytes-1)
         self.usagemap[offset:offset+bytes] = array.array('c', pattern)
         self.setobject(addr2, offset, bytes)
-        # common case: 'size' starts with a GCHeaderOffset.  In this case
-        # we can also remember that the real object starts after the header.
-        while isinstance(size, RoundedUpForAllocation):
-            size = size.basesize
-        if (isinstance(size, llmemory.CompositeOffset) and
-            isinstance(size.offsets[0], llmemory.GCHeaderOffset)):
-            objaddr = addr2 + size.offsets[0]
-            hdrbytes = llmemory.raw_malloc_usage(size.offsets[0])
+        # in the common case where 'size' starts with a GCHeaderOffset,
+        # we also remember that the real object starts after the header.
+        if hdrbytes > 0:
+            objaddr = addr2 + sz1.offsets[0]
             objoffset = offset + hdrbytes
             self.setobject(objaddr, objoffset, bytes - hdrbytes)
         return addr2
diff --git a/pypy/rpython/memory/gc/concurrentms.py 
b/pypy/rpython/memory/gc/concurrentms.py
--- a/pypy/rpython/memory/gc/concurrentms.py
+++ b/pypy/rpython/memory/gc/concurrentms.py
@@ -1,9 +1,12 @@
-from pypy.rpython.lltypesystem import lltype, llmemory, llarena, rffi
+import time, sys
+from pypy.rpython.lltypesystem import lltype, llmemory, llarena, llgroup, rffi
 from pypy.rpython.lltypesystem.lloperation import llop
 from pypy.rpython.lltypesystem.llmemory import raw_malloc_usage
+from pypy.rlib.objectmodel import we_are_translated, specialize
 from pypy.rlib.debug import ll_assert
-from pypy.rlib.rarithmetic import LONG_BIT
+from pypy.rlib.rarithmetic import LONG_BIT, r_uint
 from pypy.rpython.memory.gc.base import GCBase
+from pypy.module.thread import ll_thread
 
 #
 # A "mostly concurrent" mark&sweep GC.  It can delegate most of the GC
@@ -24,6 +27,10 @@
 WORD_POWER_2 = {32: 2, 64: 3}[LONG_BIT]
 assert 1 << WORD_POWER_2 == WORD
 size_of_addr = llmemory.sizeof(llmemory.Address)
+first_gcflag = 1 << (LONG_BIT//2)
+
+GCFLAG_MARK_TOGGLE = first_gcflag << 0
+GCFLAG_FINALIZATION_ORDERING = first_gcflag << 1
 
 
 class MostlyConcurrentMarkSweepGC(GCBase):
@@ -33,7 +40,7 @@
     needs_write_barrier = True
     prebuilt_gc_objects_are_static_roots = False
     malloc_zero_filled = True
-    #gcflag_extra = GCFLAG_FINALIZATION_ORDERING
+    gcflag_extra = GCFLAG_FINALIZATION_ORDERING
 
     HDR = lltype.Struct('header', ('tid', lltype.Signed))
     typeid_is_in_field = 'tid'
@@ -56,6 +63,54 @@
         self.free_lists = lltype.malloc(rffi.CArray(llmemory.Address),
                                         length, flavor='raw', zero=True,
                                         immortal=True)
+        self.current_mark = 0
+        #
+        # When the mutator thread wants to trigger the next collection,
+        # it scans its own stack roots and prepares everything, then
+        # sets 'collection_running' to True, and releases
+        # 'ready_to_start_lock'.  This triggers the collector thread,
+        # which re-acquires 'ready_to_start_lock' and does its job.
+        # When done it releases 'finished_lock'.  The mutator thread is
+        # responsible for resetting 'collection_running' to False.
+        self.collection_running = False
+        self.ready_to_start_lock = ll_thread.allocate_lock()
+        self.finished_lock = ll_thread.allocate_lock()
+        #
+        # NOT_RPYTHON: set to non-empty in _teardown()
+        self._teardown_now = []
+        #
+        def collector_start():
+            self.collector_run()
+        self.collector_start = collector_start
+        #
+        self.mutex_lock = ll_thread.allocate_lock()
+        self.gray_objects = self.AddressStack()
+        self.extra_objects_to_mark = self.AddressStack()
+        #
+        # Write barrier: actually a deletion barrier, triggered when there
+        # is a collection running and the mutator tries to change an object
+        # that was not scanned yet.
+        self._init_writebarrier_logic()
+
+    def setup(self):
+        "Start the concurrent collector thread."
+        self.acquire(self.finished_lock)
+        self.acquire(self.ready_to_start_lock)
+        self.collector_ident = ll_thread.start_new_thread(
+            self.collector_start, ())
+
+    def _teardown(self):
+        "NOT_RPYTHON.  Stop the collector thread after tests have run."
+        self.wait_for_the_end_of_collection()
+        #
+        # start the next collection, but with "stop" in _teardown_now,
+        # which should shut down the collector thread
+        self._teardown_now.append("stop")
+        self.collect()
+
+    def get_type_id(self, obj):
+        tid = self.header(obj).tid
+        return llop.extract_ushort(llgroup.HALFWORD, tid)
 
     def combine(self, typeid16, flags):
         return llop.combine_ushort(lltype.Signed, typeid16, flags)
@@ -76,7 +131,7 @@
                 llarena.arena_reset(result, size_of_addr, 0)
                 llarena.arena_reserve(result, totalsize)
                 hdr = llmemory.cast_adr_to_ptr(result, lltype.Ptr(self.HDR))
-                hdr.tid = self.combine(typeid, flags=0)
+                hdr.tid = self.combine(typeid, self.current_mark)
                 #
                 obj = result + size_gc_header
                 return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
@@ -120,7 +175,7 @@
             # at the end (Hans Boehm, xxx ref).
             llarena.arena_reserve(result, totalsize)
             hdr = llmemory.cast_adr_to_ptr(result, lltype.Ptr(self.HDR))
-            hdr.tid = self.combine(typeid, flags=0)
+            hdr.tid = self.combine(typeid, self.current_mark)
             #
             obj = result + size_gc_header
             return llmemory.cast_adr_to_ptr(obj, llmemory.GCREF)
@@ -138,4 +193,171 @@
         llarena.arena_reserve(page, size_of_addr)
         page.address[0] = NULL
         self.free_pages = page
-    allocate_next_arena._dont_inline_ = True
+
+
+    def write_barrier(self, newvalue, addr_struct):
+        flag = self.header(addr_struct).tid & GCFLAG_MARK_TOGGLE
+        if flag != self.current_mark:
+            self.force_scan(addr_struct)
+
+    def _init_writebarrier_logic(self):
+        #
+        def force_scan(obj):
+            self.mutex_lock.acquire(True)
+            if self.current_mark:
+                self.set_mark_flag(obj, GCFLAG_MARK_TOGGLE)
+            else:
+                self.set_mark_flag(obj, 0)
+            self.trace(obj, self._barrier_add_extra, None)
+            self.mutex_lock.release()
+        #
+        force_scan._dont_inline_ = True
+        self.force_scan = force_scan
+
+    def _barrier_add_extra(self, root, ignored):
+        self.extra_objects_to_mark.append(root.address[0])
+
+
+    def collect(self, gen=0):
+        """Trigger a complete collection, and wait for it to finish."""
+        self.trigger_next_collection()
+        self.wait_for_the_end_of_collection()
+
+    def wait_for_the_end_of_collection(self):
+        """In the mutator thread: wait for the collection currently
+        running (if any) to finish."""
+        if self.collection_running:
+            self.acquire(self.finished_lock)
+            self.collection_running = False
+            #
+            # It's possible that an object was added to 'extra_objects_to_mark'
+            # by the write barrier but not taken out by the collector thread,
+            # because it finished in the meantime.  The result is still
+            # correct, but we need to clear the list.
+            self.extra_objects_to_mark.clear()
+
+    def trigger_next_collection(self):
+        """In the mutator thread: triggers the next collection."""
+        #
+        # In case the previous collection is not over yet, wait for it
+        self.wait_for_the_end_of_collection()
+        #
+        # Scan the stack roots and the refs in non-GC objects
+        self.root_walker.walk_roots(
+            MostlyConcurrentMarkSweepGC._add_stack_root,  # stack roots
+            MostlyConcurrentMarkSweepGC._add_stack_root,  # in prebuilt non-gc
+            None)                         # static in prebuilt gc
+        #
+        # Invert this global variable, which has the effect that on all
+        # objects' state go instantly from "marked" to "non marked"
+        self.current_mark ^= GCFLAG_MARK_TOGGLE
+        #
+        # Start the collector thread
+        self.collection_running = True
+        self.ready_to_start_lock.release()
+
+    def _add_stack_root(self, root):
+        obj = root.address[0]
+        self.gray_objects.append(obj)
+
+    def acquire(self, lock):
+        if we_are_translated():
+            lock.acquire(True)
+        else:
+            while not lock.acquire(False):
+                time.sleep(0.001)
+                # ---------- EXCEPTION FROM THE COLLECTOR THREAD ----------
+                if hasattr(self, '_exc_info'):
+                    self._reraise_from_collector_thread()
+
+    def _reraise_from_collector_thread(self):
+        exc, val, tb = self._exc_info
+        raise exc, val, tb
+
+
+    def collector_run(self):
+        """Main function of the collector's thread."""
+        try:
+            while True:
+                #
+                # Wait for the lock to be released
+                self.acquire(self.ready_to_start_lock)
+                #
+                # For tests: detect when we have to shut down
+                if not we_are_translated():
+                    if self._teardown_now:
+                        self.finished_lock.release()
+                        break
+                #
+                # Mark
+                self.collector_mark()
+                #
+                # Sweep
+                self.collector_sweep()
+        except Exception, e:
+            print 'Crash!', e.__class__.__name__, e
+            self._exc_info = sys.exc_info()
+
+    @specialize.arg(2)
+    def is_marked(self, obj, current_mark):
+        return (self.header(obj).tid & GCFLAG_MARK_TOGGLE) == current_mark
+
+    @specialize.arg(2)
+    def set_mark_flag(self, obj, current_mark):
+        if current_mark:
+            self.header(obj).tid |= GCFLAG_MARK_TOGGLE
+        else:
+            self.header(obj).tid &= ~GCFLAG_MARK_TOGGLE
+
+    def collector_mark(self):
+        while True:
+            #
+            # Do marking.  The following function call is interrupted
+            # if the mutator's write barrier adds new objects to
+            # 'extra_objects_to_mark'.
+            if self.current_mark:
+                self._collect_mark(GCFLAG_MARK_TOGGLE)
+            else:
+                self._collect_mark(0)
+            #
+            # Move the objects from 'extra_objects_to_mark' to
+            # 'gray_objects'.  This requires the mutex lock.
+            # There are typically only a few objects to move here,
+            # unless XXX we've hit the write barrier of a large array
+            self.mutex_lock.acquire(True)
+            while self.extra_objects_to_mark.non_empty():
+                obj = self.extra_objects_to_mark.pop()
+                self.gray_objects.append(obj)
+            self.mutex_lock.release()
+            #
+            # If 'gray_objects' is empty, we are done: there should be
+            # no possible case in which more objects are being added to
+            # 'extra_objects_to_mark' concurrently, because 'gray_objects'
+            # and 'extra_objects_to_mark' were already empty before we
+            # acquired the 'mutex_lock', so all reachable objects have
+            # been marked.
+            if not self.gray_objects.non_empty():
+                return
+
+    @specialize.arg(1)
+    def _collect_mark(self, current_mark):
+        while self.gray_objects.non_empty():
+            obj = self.gray_objects.pop()
+            if not self.is_marked(obj, current_mark):
+                self.set_mark_flag(obj, current_mark)
+                self.trace(obj, self._collect_add_pending, None)
+                #
+                # Interrupt early if the mutator's write barrier adds stuff
+                # to that list.  Note that the check is imprecise because
+                # it is not lock-protected, but that's good enough.  The
+                # idea is that we trace in priority objects flagged with
+                # the write barrier, because they are more likely to
+                # reference further objects that will soon be accessed too.
+                if self.extra_objects_to_mark.non_empty():
+                    return
+
+    def _collect_add_pending(self, root, ignored):
+        self.gray_objects.append(root.address[0])
+
+    def collector_sweep(self):
+        xxx
diff --git a/pypy/rpython/memory/support.py b/pypy/rpython/memory/support.py
--- a/pypy/rpython/memory/support.py
+++ b/pypy/rpython/memory/support.py
@@ -114,6 +114,13 @@
                 self.shrink()
             return result
 
+        def clear(self):
+            while self.chunk.next:
+                next = self.chunk.next
+                unused_chunks.put(self.chunk)
+                self.chunk = next
+            self.used_in_last_chunk = 0
+
         def delete(self):
             cur = self.chunk
             while cur:
diff --git a/pypy/rpython/memory/test/test_support.py 
b/pypy/rpython/memory/test/test_support.py
--- a/pypy/rpython/memory/test/test_support.py
+++ b/pypy/rpython/memory/test/test_support.py
@@ -94,6 +94,21 @@
             assert a == addrs[i]
         assert not ll.non_empty()
 
+    def test_clear(self):
+        AddressStack = get_address_stack()
+        addrs = [raw_malloc(llmemory.sizeof(lltype.Signed))
+                 for i in range(2200)]
+        ll = AddressStack()
+        for i in range(2200):
+            ll.append(addrs[i])
+        ll.clear()
+        assert not ll.non_empty()
+        ll.append(addrs[0])
+        assert ll.non_empty()
+        x = ll.pop()
+        assert x == addrs[0]
+        assert not ll.non_empty()
+
 
 class TestAddressDeque:
     def test_big_access(self):
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to