Author: Armin Rigo <[email protected]>
Branch:
Changeset: r1078:0e7ff5304cac
Date: 2014-03-19 10:20 +0100
http://bitbucket.org/pypy/stmgc/changeset/0e7ff5304cac/
Log: hg merge c7-fork
Add pthread_atfork() to manually de-share the mapping, mounted with
MAP_SHARED.
diff --git a/c7/demo/demo_random.c b/c7/demo/demo_random.c
--- a/c7/demo/demo_random.c
+++ b/c7/demo/demo_random.c
@@ -4,6 +4,8 @@
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
#include "stmgc.h"
@@ -12,12 +14,14 @@
#define THREAD_STARTS 1000 // how many restarts of threads
#define PREBUILT_ROOTS 3
#define MAXROOTS 1000
+#define FORKS 3
// SUPPORT
struct node_s;
typedef TLPREFIX struct node_s node_t;
typedef node_t* nodeptr_t;
typedef object_t* objptr_t;
+int num_forked_children = 0;
struct node_s {
struct object_s hdr;
@@ -335,19 +339,39 @@
if (p == (objptr_t)-1) {
push_roots();
- stm_commit_transaction();
- td.num_roots_at_transaction_start = td.num_roots;
+ if (arg == NULL) { /* common case */
+ stm_commit_transaction();
+ td.num_roots_at_transaction_start = td.num_roots;
+ if (get_rand(100) < 98) {
+ STM_START_TRANSACTION(&stm_thread_local, here);
+ } else {
+ stm_start_inevitable_transaction(&stm_thread_local);
+ }
+ td.num_roots = td.num_roots_at_transaction_start;
+ p = NULL;
+ pop_roots();
+ reload_roots();
+ }
+ else {
+ /* run a fork() inside the transaction */
+ printf("========== FORK =========\n");
+ arg = NULL;
+ pid_t child = fork();
+ printf("=== in process %d thread %lx, fork() returned %d\n",
+ (int)getpid(), (long)pthread_self(), (int)child);
+ if (child == -1) {
+ fprintf(stderr, "fork() error: %m\n");
+ abort();
+ }
+ if (child != 0)
+ num_forked_children++;
+ else
+ num_forked_children = 0;
- if (get_rand(100) < 98) {
- STM_START_TRANSACTION(&stm_thread_local, here);
- } else {
- stm_start_inevitable_transaction(&stm_thread_local);
+ pop_roots();
+ p = NULL;
}
- td.num_roots = td.num_roots_at_transaction_start;
- p = NULL;
- pop_roots();
- reload_roots();
}
}
stm_commit_transaction();
@@ -427,8 +451,24 @@
assert(status == 0);
printf("thread finished\n");
if (thread_starts) {
+ long forkbase = NUMTHREADS * THREAD_STARTS / (FORKS + 1);
+ long _fork = (thread_starts % forkbase) == 0;
thread_starts--;
- newthread(demo_random, NULL);
+ newthread(demo_random, (void *)_fork);
+ }
+ }
+
+ for (i = 0; i < num_forked_children; i++) {
+ pid_t child = wait(&status);
+ if (child == -1)
+ perror("wait");
+ printf("From %d: child %d terminated with exit status %d\n",
+ (int)getpid(), (int)child, status);
+ if (WIFEXITED(status) && WEXITSTATUS(status) == 0)
+ ;
+ else {
+ printf("*** error from the child ***\n");
+ return 1;
}
}
diff --git a/c7/stm/forksupport.c b/c7/stm/forksupport.c
new file mode 100644
--- /dev/null
+++ b/c7/stm/forksupport.c
@@ -0,0 +1,295 @@
+#ifndef _STM_CORE_H_
+# error "must be compiled via stmgc.c"
+#endif
+
+
+/* XXX this is currently not doing copy-on-write, but simply forces a
+ copy of all pages as soon as fork() is called. */
+
+
+static char *fork_big_copy = NULL;
+static stm_thread_local_t *fork_this_tl;
+static bool fork_was_in_transaction;
+
+static char *setup_mmap(char *reason); /* forward, in setup.c */
+static pthread_t *_get_cpth(stm_thread_local_t *);/* forward, in setup.c */
+
+
+static bool page_is_null(char *p)
+{
+ long *q = (long *)p;
+ long i;
+ for (i = 0; i < 4096 / sizeof(long); i++)
+ if (q[i] != 0)
+ return false;
+ return true;
+}
+
+
+static void forksupport_prepare(void)
+{
+ if (stm_object_pages == NULL)
+ return;
+
+ /* So far we attempt to check this by walking all stm_thread_local_t,
+ marking the one from the current thread, and verifying that it's not
+ running a transaction. This assumes that the stm_thread_local_t is just
+ a __thread variable, so never changes threads.
+ */
+ s_mutex_lock();
+
+ dprintf(("forksupport_prepare\n"));
+
+ stm_thread_local_t *this_tl = NULL;
+ stm_thread_local_t *tl = stm_all_thread_locals;
+ do {
+ if (pthread_equal(*_get_cpth(tl), pthread_self())) {
+ if (this_tl != NULL)
+ stm_fatalerror("fork(): found several stm_thread_local_t"
+ " from the same thread");
+ this_tl = tl;
+ }
+ tl = tl->next;
+ } while (tl != stm_all_thread_locals);
+
+ if (this_tl == NULL)
+ stm_fatalerror("fork(): found no stm_thread_local_t from this thread");
+ s_mutex_unlock();
+
+ bool was_in_transaction = _stm_in_transaction(this_tl);
+ if (was_in_transaction) {
+ stm_become_inevitable("fork");
+ /* Note that the line above can still fail and abort, which should
+ be fine */
+ }
+ else {
+ stm_start_inevitable_transaction(this_tl);
+ }
+
+ s_mutex_lock();
+ synchronize_all_threads();
+ mutex_pages_lock();
+
+ /* Make a new mmap at some other address, but of the same size as
+ the standard mmap at stm_object_pages
+ */
+ char *big_copy = setup_mmap("stmgc's fork support");
+
+ /* Copy each of the segment infos into the new mmap, nurseries,
+ and associated read markers
+ */
+ long i;
+ for (i = 1; i <= NB_SEGMENTS; i++) {
+ char *src, *dst;
+ struct stm_priv_segment_info_s *psrc = get_priv_segment(i);
+ dst = big_copy + (((char *)psrc) - stm_object_pages);
+ *(struct stm_priv_segment_info_s *)dst = *psrc;
+
+ src = get_segment_base(i) + FIRST_READMARKER_PAGE * 4096UL;
+ dst = big_copy + (src - stm_object_pages);
+ long j;
+ for (j = 0; j < END_NURSERY_PAGE - FIRST_READMARKER_PAGE; j++) {
+ if (!page_is_null(src))
+ pagecopy(dst, src);
+ src += 4096;
+ dst += 4096;
+ }
+ }
+
+ /* Copy all the data from the two ranges of objects (large, small)
+ into the new mmap
+ */
+ uintptr_t pagenum, endpagenum;
+ pagenum = END_NURSERY_PAGE; /* starts after the nursery */
+ endpagenum = (uninitialized_page_start - stm_object_pages) / 4096UL;
+ if (endpagenum < NB_PAGES)
+ endpagenum++; /* the next page too, because it might contain
+ data from largemalloc */
+
+ while (1) {
+ if (UNLIKELY(pagenum == endpagenum)) {
+ /* we reach this point usually twice, because there are
+ more pages after 'uninitialized_page_stop' */
+ if (endpagenum == NB_PAGES)
+ break; /* done */
+ pagenum = (uninitialized_page_stop - stm_object_pages) / 4096UL;
+ pagenum--; /* the prev page too, because it does contain
+ data from largemalloc */
+ endpagenum = NB_PAGES;
+ }
+
+ char *src = stm_object_pages + pagenum * 4096UL;
+ char *dst = big_copy + pagenum * 4096UL;
+ pagecopy(dst, src);
+
+ struct page_shared_s ps = pages_privatized[pagenum - PAGE_FLAG_START];
+ if (ps.by_segment != 0) {
+ long j;
+ for (j = 0; j < NB_SEGMENTS; j++) {
+ src += NB_PAGES * 4096UL;
+ dst += NB_PAGES * 4096UL;
+ if (ps.by_segment & (1 << j)) {
+ pagecopy(dst, src);
+ }
+ }
+ }
+ pagenum++;
+ }
+
+ assert(fork_big_copy == NULL);
+ fork_big_copy = big_copy;
+ fork_this_tl = this_tl;
+ fork_was_in_transaction = was_in_transaction;
+
+ assert(_has_mutex());
+ dprintf(("forksupport_prepare: from %p %p\n", fork_this_tl,
+ fork_this_tl->creating_pthread[0]));
+}
+
+static void forksupport_parent(void)
+{
+ if (stm_object_pages == NULL)
+ return;
+
+ dprintf(("forksupport_parent: continuing to run %p %p\n", fork_this_tl,
+ fork_this_tl->creating_pthread[0]));
+ assert(_has_mutex());
+ assert(_is_tl_registered(fork_this_tl));
+
+ /* In the parent, after fork(), we can simply forget about the big copy
+ that we made for the child.
+ */
+ assert(fork_big_copy != NULL);
+ munmap(fork_big_copy, TOTAL_MEMORY);
+ fork_big_copy = NULL;
+ bool was_in_transaction = fork_was_in_transaction;
+
+ mutex_pages_unlock();
+ s_mutex_unlock();
+
+ if (!was_in_transaction) {
+ stm_commit_transaction();
+ }
+
+ dprintf(("forksupport_parent: continuing to run\n"));
+}
+
+static void fork_abort_thread(long i)
+{
+ struct stm_priv_segment_info_s *pr = get_priv_segment(i);
+ dprintf(("forksupport_child: abort in seg%ld\n", i));
+ assert(pr->pub.running_thread->associated_segment_num == i);
+ assert(pr->transaction_state == TS_REGULAR);
+ set_gs_register(get_segment_base(i));
+
+ stm_jmpbuf_t jmpbuf;
+ if (__builtin_setjmp(jmpbuf) == 0) {
+ pr->pub.jmpbuf_ptr = &jmpbuf;
+#ifndef NDEBUG
+ pr->running_pthread = pthread_self();
+#endif
+ stm_abort_transaction();
+ }
+}
+
+static void forksupport_child(void)
+{
+ if (stm_object_pages == NULL)
+ return;
+
+ /* this new process contains no other thread, so we can
+ just release these locks early */
+ mutex_pages_unlock();
+ s_mutex_unlock();
+
+ /* Move the copy of the mmap over the old one, overwriting it
+ and thus freeing the old mapping in this process
+ */
+ assert(fork_big_copy != NULL);
+ assert(stm_object_pages != NULL);
+ void *res = mremap(fork_big_copy, TOTAL_MEMORY, TOTAL_MEMORY,
+ MREMAP_MAYMOVE | MREMAP_FIXED,
+ stm_object_pages);
+ if (res != stm_object_pages)
+ stm_fatalerror("after fork: mremap failed: %m");
+ fork_big_copy = NULL;
+
+ /* Unregister all other stm_thread_local_t, mostly as a way to free
+ the memory used by the shadowstacks
+ */
+ while (stm_all_thread_locals->next != stm_all_thread_locals) {
+ if (stm_all_thread_locals == fork_this_tl)
+ stm_unregister_thread_local(stm_all_thread_locals->next);
+ else
+ stm_unregister_thread_local(stm_all_thread_locals);
+ }
+ assert(stm_all_thread_locals == fork_this_tl);
+
+ /* Make all pages shared again.
+ */
+ uintptr_t pagenum, endpagenum;
+ pagenum = END_NURSERY_PAGE; /* starts after the nursery */
+ endpagenum = (uninitialized_page_start - stm_object_pages) / 4096UL;
+
+ while (1) {
+ if (UNLIKELY(pagenum == endpagenum)) {
+ /* we reach this point usually twice, because there are
+ more pages after 'uninitialized_page_stop' */
+ if (endpagenum == NB_PAGES)
+ break; /* done */
+ pagenum = (uninitialized_page_stop - stm_object_pages) / 4096UL;
+ endpagenum = NB_PAGES;
+ if (endpagenum == NB_PAGES)
+ break; /* done */
+ }
+
+ struct page_shared_s ps = pages_privatized[pagenum - PAGE_FLAG_START];
+ long j;
+ for (j = 0; j < NB_SEGMENTS; j++) {
+ if (!(ps.by_segment & (1 << j))) {
+ _page_do_reshare(j + 1, pagenum);
+ }
+ }
+ pagenum++;
+ }
+
+ /* Force the interruption of other running segments
+ */
+ long i;
+ for (i = 1; i <= NB_SEGMENTS; i++) {
+ struct stm_priv_segment_info_s *pr = get_priv_segment(i);
+ if (pr->pub.running_thread != NULL &&
+ pr->pub.running_thread != fork_this_tl) {
+ fork_abort_thread(i);
+ }
+ }
+
+ /* Restore a few things: the new pthread_self(), and the %gs
+ register */
+ int segnum = fork_this_tl->associated_segment_num;
+ assert(1 <= segnum && segnum <= NB_SEGMENTS);
+ *_get_cpth(fork_this_tl) = pthread_self();
+ set_gs_register(get_segment_base(segnum));
+ assert(STM_SEGMENT->segment_num == segnum);
+
+ if (!fork_was_in_transaction) {
+ stm_commit_transaction();
+ }
+
+ /* Done */
+ dprintf(("forksupport_child: running one thread now\n"));
+}
+
+
+static void setup_forksupport(void)
+{
+ static bool fork_support_ready = false;
+
+ if (!fork_support_ready) {
+ int res = pthread_atfork(forksupport_prepare, forksupport_parent,
+ forksupport_child);
+ if (res != 0)
+ stm_fatalerror("pthread_atfork() failed: %m");
+ fork_support_ready = true;
+ }
+}
diff --git a/c7/stm/fprintcolor.c b/c7/stm/fprintcolor.c
--- a/c7/stm/fprintcolor.c
+++ b/c7/stm/fprintcolor.c
@@ -8,8 +8,8 @@
char buffer[2048];
va_list ap;
int result;
- int size = (int)sprintf(buffer, "\033[%dm[%lx] ", dprintfcolor(),
- (long)pthread_self());
+ int size = (int)sprintf(buffer, "\033[%dm[%d,%lx] ", dprintfcolor(),
+ (int)getpid(), (long)pthread_self());
assert(size >= 0);
va_start(ap, format);
diff --git a/c7/stm/largemalloc.c b/c7/stm/largemalloc.c
--- a/c7/stm/largemalloc.c
+++ b/c7/stm/largemalloc.c
@@ -273,8 +273,10 @@
/* unlink the following chunk */
mscan->d.next->prev = mscan->d.prev;
mscan->d.prev->next = mscan->d.next;
- assert((mscan->prev_size = (size_t)-258, 1)); /* 0xfffffffffffffefe */
- assert((mscan->size = (size_t)-515, 1)); /* 0xfffffffffffffdfd */
+#ifndef NDEBUG
+ mscan->prev_size = (size_t)-258; /* 0xfffffffffffffefe */
+ mscan->size = (size_t)-515; /* 0xfffffffffffffdfd */
+#endif
/* merge the two chunks */
assert(fsize == fscan->prev_size);
diff --git a/c7/stm/nursery.c b/c7/stm/nursery.c
--- a/c7/stm/nursery.c
+++ b/c7/stm/nursery.c
@@ -32,10 +32,6 @@
}
}
-static void teardown_nursery(void)
-{
-}
-
static inline bool _is_in_nursery(object_t *obj)
{
assert((uintptr_t)obj >= NURSERY_START);
diff --git a/c7/stm/pages.c b/c7/stm/pages.c
--- a/c7/stm/pages.c
+++ b/c7/stm/pages.c
@@ -103,6 +103,8 @@
segment 0. */
uintptr_t i;
assert(_has_mutex_pages());
+ if (count == 0)
+ return;
for (i = 1; i <= NB_SEGMENTS; i++) {
char *segment_base = get_segment_base(i);
d_remap_file_pages(segment_base + pagenum * 4096UL,
@@ -140,6 +142,13 @@
mutex_pages_unlock();
}
+static void _page_do_reshare(long segnum, uintptr_t pagenum)
+{
+ char *segment_base = get_segment_base(segnum);
+ d_remap_file_pages(segment_base + pagenum * 4096UL,
+ 4096, pagenum);
+}
+
static void page_reshare(uintptr_t pagenum)
{
struct page_shared_s ps = pages_privatized[pagenum - PAGE_FLAG_START];
diff --git a/c7/stm/pages.h b/c7/stm/pages.h
--- a/c7/stm/pages.h
+++ b/c7/stm/pages.h
@@ -38,7 +38,9 @@
static void pages_initialize_shared(uintptr_t pagenum, uintptr_t count);
static void page_privatize(uintptr_t pagenum);
static void page_reshare(uintptr_t pagenum);
+static void _page_do_reshare(long segnum, uintptr_t pagenum);
+/* Note: don't ever do "mutex_pages_lock(); mutex_lock()" in that order */
static void mutex_pages_lock(void);
static void mutex_pages_unlock(void);
static bool _has_mutex_pages(void) __attribute__((unused));
diff --git a/c7/stm/setup.c b/c7/stm/setup.c
--- a/c7/stm/setup.c
+++ b/c7/stm/setup.c
@@ -3,6 +3,17 @@
#endif
+static char *setup_mmap(char *reason)
+{
+ char *result = mmap(NULL, TOTAL_MEMORY,
+ PROT_READ | PROT_WRITE,
+ MAP_PAGES_FLAGS, -1, 0);
+ if (result == MAP_FAILED)
+ stm_fatalerror("%s failed: %m\n", reason);
+
+ return result;
+}
+
void stm_setup(void)
{
/* Check that some values are acceptable */
@@ -20,13 +31,9 @@
(FIRST_READMARKER_PAGE * 4096UL));
assert(_STM_FAST_ALLOC <= NB_NURSERY_PAGES * 4096);
- stm_object_pages = mmap(NULL, TOTAL_MEMORY,
- PROT_READ | PROT_WRITE,
- MAP_PAGES_FLAGS, -1, 0);
- if (stm_object_pages == MAP_FAILED)
- stm_fatalerror("initial stm_object_pages mmap() failed: %m");
+ stm_object_pages = setup_mmap("initial stm_object_pages mmap()");
- /* The segment 0 is not used to run transactions, but to contain the
+ /* The segment 0 is not used to run transactions, but contains the
shared copy of the pages. We mprotect all pages before so that
accesses fail, up to and including the pages corresponding to the
nurseries of the other segments. */
@@ -83,6 +90,7 @@
setup_nursery();
setup_gcpage();
setup_pages();
+ setup_forksupport();
}
void stm_teardown(void)
@@ -110,11 +118,10 @@
teardown_core();
teardown_sync();
teardown_gcpage();
- teardown_nursery();
teardown_pages();
}
-void _init_shadow_stack(stm_thread_local_t *tl)
+static void _init_shadow_stack(stm_thread_local_t *tl)
{
struct stm_shadowentry_s *s = (struct stm_shadowentry_s *)
malloc(SHADOW_STACK_SIZE * sizeof(struct stm_shadowentry_s));
@@ -123,13 +130,18 @@
tl->shadowstack_base = s;
}
-void _done_shadow_stack(stm_thread_local_t *tl)
+static void _done_shadow_stack(stm_thread_local_t *tl)
{
free(tl->shadowstack_base);
tl->shadowstack = NULL;
tl->shadowstack_base = NULL;
}
+static pthread_t *_get_cpth(stm_thread_local_t *tl)
+{
+ assert(sizeof(pthread_t) <= sizeof(tl->creating_pthread));
+ return (pthread_t *)(tl->creating_pthread);
+}
void stm_register_thread_local(stm_thread_local_t *tl)
{
@@ -153,6 +165,7 @@
numbers automatically. */
num = (num % NB_SEGMENTS) + 1;
tl->associated_segment_num = num;
+ *_get_cpth(tl) = pthread_self();
_init_shadow_stack(tl);
set_gs_register(get_segment_base(num));
s_mutex_unlock();
@@ -161,6 +174,7 @@
void stm_unregister_thread_local(stm_thread_local_t *tl)
{
s_mutex_lock();
+ assert(tl->prev != NULL);
assert(tl->next != NULL);
_done_shadow_stack(tl);
if (tl == stm_all_thread_locals) {
diff --git a/c7/stmgc.c b/c7/stmgc.c
--- a/c7/stmgc.c
+++ b/c7/stmgc.c
@@ -23,6 +23,7 @@
#include "stm/largemalloc.c"
#include "stm/nursery.c"
#include "stm/sync.c"
+#include "stm/forksupport.c"
#include "stm/setup.c"
#include "stm/hash_id.c"
#include "stm/core.c"
diff --git a/c7/stmgc.h b/c7/stmgc.h
--- a/c7/stmgc.h
+++ b/c7/stmgc.h
@@ -69,6 +69,7 @@
/* the next fields are handled internally by the library */
int associated_segment_num;
struct stm_thread_local_s *prev, *next;
+ void *creating_pthread[2];
} stm_thread_local_t;
/* this should use llvm's coldcc calling convention,
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit