Hi all,

As part of RunGenCollect() (in src/main/memory.c), some maintenance on the 
CHARSXP cache is done, namely unmarked nodes/CHARSXPs are removed from the hash 
chains. This requires always touching all CHARSXP in the cache, irrespective of 
the number of generations which were just garbage collected. In a session with 
a big CHARSXP cache, this will significantly slow down gc also when just 
collecting the youngest generation.

However, this part of RunGenCollect() seems to be one of the few which can 
easily be parallelized without the need for thread synchronization. And it 
seems to be the one most profiting from parallelization.

Attached patch (parallel_CHARSXP_cache.diff) implements parallelization over 
the elements of R_StringHash and gives the following performance improvements 
on my system when using 4 threads compared to R devel (revision 81008):

Elapsed time for 200 non-full gc in a session after

x <- as.character(runif(1e6))[]
gc(full = TRUE)

8sec -> 2.5sec.

AND

Elapsed time for five non-full gc in a session after

x <- as.character(runif(5e7))[]
gc(full = TRUE)

21sec -> 6sec.

In the patch, I dropped the two lines 

FORWARD_NODE(s);
FORWARD_NODE(CXHEAD(s));

because they are currently both no-ops (and would require synchronization if 
they were not). They are no-ops because we have

# define CXHEAD(x) (x)  // in Defn.h

and hence FORWARD_NODE(s)/FORWARD_NODE(CXHEAD(s)) is only called when s is 
already marked, in which case FORWARD_NODE() does nothing.

I used OpenMP despite the known issues of some of its implementations with 
hanging after a fork, mostly because it was the easiest thing to do for a PoC. 
I worked around this similar to e.g. data.table by using only one thread in 
forked children.

It might be worth considering making the parallelization conditional on the 
size of the CHARSXP cache and use only the main thread if the cache is (still) 
small.

In the second attached patch (parallel_CHARSXP_cache_no_forward.diff) I 
additionally no longer call FORWARD_NODE(R_StringHash) because this will make 
the following call to PROCESS_NODES() iterate through all elements of 
R_StringHash again which is unnecessary since all elements are either 
R_NilValue or an already marked CHARSXP. I rather directly mark & snap 
R_StringHash. In contrast to the parallelization, this only affects full gcs 
since R_StringHash will quickly belong to the oldest generation.

Attached gc_test.R is the script I used to get the previously mentioned and 
more gc timings.

To me this looks like a significant performance improvement, especially given 
the little changeset. What do you think?

Best regards,
Andreas
Index: src/main/memory.c
===================================================================
--- src/main/memory.c	(revision 81008)
+++ src/main/memory.c	(working copy)
@@ -90,6 +90,7 @@
 #include <R_ext/Rallocators.h> /* for R_allocator_t structure */
 #include <Rmath.h> // R_pow_di
 #include <Print.h> // R_print
+#include <pthread.h> // pthread_atfork
 
 /* malloc uses size_t.  We are assuming here that size_t is at least
    as large as unsigned long.  Changed from int at 1.6.0 to (i) allow
@@ -100,6 +101,22 @@
 static int gc_reporting = 0;
 static int gc_count = 0;
 
+static int gc_threads = 4;
+static int pre_fork_gc_threads;
+
+void when_fork() {
+    pre_fork_gc_threads = gc_threads;
+    gc_threads = 1;
+}
+
+void after_fork() {
+    gc_threads = pre_fork_gc_threads;
+}
+
+void avoid_openmp_hang_within_fork() {
+    pthread_atfork(&when_fork, &after_fork, NULL);
+}
+
 /* Report error encountered during garbage collection where for detecting
    problems it is better to abort, but for debugging (or some production runs,
    where external validation of results is possible) it may be preferred to
@@ -406,6 +423,18 @@
     }
 }
 
+static void init_gc_threads()
+{
+    char *arg;
+
+    arg = getenv("R_GC_THREADS");
+
+    if (arg != NULL) {
+        int threads = (int) atof(arg);
+        if (threads >= 1) gc_threads = threads;
+    }
+}
+
 /* Maximal Heap Limits.  These variables contain upper limits on the
    heap sizes.  They could be made adjustable from the R level,
    perhaps by a handler for a recoverable error.
@@ -1815,6 +1844,7 @@
     {
 	SEXP t;
 	int nc = 0;
+#pragma omp parallel for num_threads(gc_threads) reduction(+:nc) private(s, t) schedule(static)
 	for (i = 0; i < LENGTH(R_StringHash); i++) {
 	    s = VECTOR_ELT(R_StringHash, i);
 	    t = R_NilValue;
@@ -1827,8 +1857,6 @@
 		    s = CXTAIL(s);
 		    continue;
 		}
-		FORWARD_NODE(s);
-		FORWARD_NODE(CXHEAD(s));
 		t = s;
 		s = CXTAIL(s);
 	    }
@@ -2124,6 +2152,7 @@
 
     init_gctorture();
     init_gc_grow_settings();
+    init_gc_threads();
 
     arg = getenv("_R_GC_FAIL_ON_ERROR_");
     if (arg != NULL && StringTrue(arg))
@@ -2215,6 +2244,8 @@
     R_LogicalNAValue = allocVector(LGLSXP, 1);
     LOGICAL(R_LogicalNAValue)[0] = NA_LOGICAL;
     MARK_NOT_MUTABLE(R_LogicalNAValue);
+    
+    avoid_openmp_hang_within_fork();
 }
 
 /* Since memory allocated from the heap is non-moving, R_alloc just
Index: src/main/memory.c
===================================================================
--- src/main/memory.c	(revision 81008)
+++ src/main/memory.c	(working copy)
@@ -90,6 +90,7 @@
 #include <R_ext/Rallocators.h> /* for R_allocator_t structure */
 #include <Rmath.h> // R_pow_di
 #include <Print.h> // R_print
+#include <pthread.h> // pthread_atfork
 
 /* malloc uses size_t.  We are assuming here that size_t is at least
    as large as unsigned long.  Changed from int at 1.6.0 to (i) allow
@@ -100,6 +101,22 @@
 static int gc_reporting = 0;
 static int gc_count = 0;
 
+static int gc_threads = 4;
+static int pre_fork_gc_threads;
+
+void when_fork() {
+    pre_fork_gc_threads = gc_threads;
+    gc_threads = 1;
+}
+
+void after_fork() {
+    gc_threads = pre_fork_gc_threads;
+}
+
+void avoid_openmp_hang_within_fork() {
+    pthread_atfork(&when_fork, &after_fork, NULL);
+}
+
 /* Report error encountered during garbage collection where for detecting
    problems it is better to abort, but for debugging (or some production runs,
    where external validation of results is possible) it may be preferred to
@@ -406,6 +423,18 @@
     }
 }
 
+static void init_gc_threads()
+{
+    char *arg;
+
+    arg = getenv("R_GC_THREADS");
+
+    if (arg != NULL) {
+        int threads = (int) atof(arg);
+        if (threads >= 1) gc_threads = threads;
+    }
+}
+
 /* Maximal Heap Limits.  These variables contain upper limits on the
    heap sizes.  They could be made adjustable from the R level,
    perhaps by a handler for a recoverable error.
@@ -1815,6 +1844,7 @@
     {
 	SEXP t;
 	int nc = 0;
+#pragma omp parallel for num_threads(gc_threads) reduction(+:nc) private(s, t) schedule(static)
 	for (i = 0; i < LENGTH(R_StringHash); i++) {
 	    s = VECTOR_ELT(R_StringHash, i);
 	    t = R_NilValue;
@@ -1827,8 +1857,6 @@
 		    s = CXTAIL(s);
 		    continue;
 		}
-		FORWARD_NODE(s);
-		FORWARD_NODE(CXHEAD(s));
 		t = s;
 		s = CXTAIL(s);
 	    }
@@ -1835,9 +1863,17 @@
 	    if(VECTOR_ELT(R_StringHash, i) != R_NilValue) nc++;
 	}
 	SET_TRUELENGTH(R_StringHash, nc); /* SET_HASHPRI, really */
+
+	/* instead of 'FORWARD_NODE(R_StringHash); PROCESS_NODES();' to avoid
+	   iterating over all its elements again which are all already marked*/
+	if (! NODE_IS_MARKED(R_StringHash)) {
+	    CHECK_FOR_FREE_NODE(R_StringHash)
+	    MARK_NODE(R_StringHash);
+	    UNSNAP_NODE(R_StringHash);
+	    SNAP_NODE(R_StringHash, R_GenHeap[NODE_CLASS(R_StringHash)].Old[NODE_GENERATION(R_StringHash)]);
+	    R_GenHeap[NODE_CLASS(R_StringHash)].OldCount[NODE_GENERATION(R_StringHash)]++;
     }
-    FORWARD_NODE(R_StringHash);
-    PROCESS_NODES();
+    }
 
 #ifdef PROTECTCHECK
     for(i=0; i< NUM_SMALL_NODE_CLASSES;i++){
@@ -2124,6 +2160,7 @@
 
     init_gctorture();
     init_gc_grow_settings();
+    init_gc_threads();
 
     arg = getenv("_R_GC_FAIL_ON_ERROR_");
     if (arg != NULL && StringTrue(arg))
@@ -2215,6 +2252,8 @@
     R_LogicalNAValue = allocVector(LGLSXP, 1);
     LOGICAL(R_LogicalNAValue)[0] = NA_LOGICAL;
     MARK_NOT_MUTABLE(R_LogicalNAValue);
+
+    avoid_openmp_hang_within_fork();
 }
 
 /* Since memory allocated from the heap is non-moving, R_alloc just
______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel

Reply via email to