Hi all,
As part of RunGenCollect() (in src/main/memory.c), some maintenance on the
CHARSXP cache is done, namely unmarked nodes/CHARSXPs are removed from the hash
chains. This requires always touching all CHARSXP in the cache, irrespective of
the number of generations which were just garbage collected. In a session with
a big CHARSXP cache, this will significantly slow down gc also when just
collecting the youngest generation.
However, this part of RunGenCollect() seems to be one of the few which can
easily be parallelized without the need for thread synchronization. And it
seems to be the one most profiting from parallelization.
Attached patch (parallel_CHARSXP_cache.diff) implements parallelization over
the elements of R_StringHash and gives the following performance improvements
on my system when using 4 threads compared to R devel (revision 81008):
Elapsed time for 200 non-full gc in a session after
x <- as.character(runif(1e6))[]
gc(full = TRUE)
8sec -> 2.5sec.
AND
Elapsed time for five non-full gc in a session after
x <- as.character(runif(5e7))[]
gc(full = TRUE)
21sec -> 6sec.
In the patch, I dropped the two lines
FORWARD_NODE(s);
FORWARD_NODE(CXHEAD(s));
because they are currently both no-ops (and would require synchronization if
they were not). They are no-ops because we have
# define CXHEAD(x) (x) // in Defn.h
and hence FORWARD_NODE(s)/FORWARD_NODE(CXHEAD(s)) is only called when s is
already marked, in which case FORWARD_NODE() does nothing.
I used OpenMP despite the known issues of some of its implementations with
hanging after a fork, mostly because it was the easiest thing to do for a PoC.
I worked around this similar to e.g. data.table by using only one thread in
forked children.
It might be worth considering making the parallelization conditional on the
size of the CHARSXP cache and use only the main thread if the cache is (still)
small.
In the second attached patch (parallel_CHARSXP_cache_no_forward.diff) I
additionally no longer call FORWARD_NODE(R_StringHash) because this will make
the following call to PROCESS_NODES() iterate through all elements of
R_StringHash again which is unnecessary since all elements are either
R_NilValue or an already marked CHARSXP. I rather directly mark & snap
R_StringHash. In contrast to the parallelization, this only affects full gcs
since R_StringHash will quickly belong to the oldest generation.
Attached gc_test.R is the script I used to get the previously mentioned and
more gc timings.
To me this looks like a significant performance improvement, especially given
the little changeset. What do you think?
Best regards,
Andreas
Index: src/main/memory.c
===================================================================
--- src/main/memory.c (revision 81008)
+++ src/main/memory.c (working copy)
@@ -90,6 +90,7 @@
#include <R_ext/Rallocators.h> /* for R_allocator_t structure */
#include <Rmath.h> // R_pow_di
#include <Print.h> // R_print
+#include <pthread.h> // pthread_atfork
/* malloc uses size_t. We are assuming here that size_t is at least
as large as unsigned long. Changed from int at 1.6.0 to (i) allow
@@ -100,6 +101,22 @@
static int gc_reporting = 0;
static int gc_count = 0;
+static int gc_threads = 4;
+static int pre_fork_gc_threads;
+
+void when_fork() {
+ pre_fork_gc_threads = gc_threads;
+ gc_threads = 1;
+}
+
+void after_fork() {
+ gc_threads = pre_fork_gc_threads;
+}
+
+void avoid_openmp_hang_within_fork() {
+ pthread_atfork(&when_fork, &after_fork, NULL);
+}
+
/* Report error encountered during garbage collection where for detecting
problems it is better to abort, but for debugging (or some production runs,
where external validation of results is possible) it may be preferred to
@@ -406,6 +423,18 @@
}
}
+static void init_gc_threads()
+{
+ char *arg;
+
+ arg = getenv("R_GC_THREADS");
+
+ if (arg != NULL) {
+ int threads = (int) atof(arg);
+ if (threads >= 1) gc_threads = threads;
+ }
+}
+
/* Maximal Heap Limits. These variables contain upper limits on the
heap sizes. They could be made adjustable from the R level,
perhaps by a handler for a recoverable error.
@@ -1815,6 +1844,7 @@
{
SEXP t;
int nc = 0;
+#pragma omp parallel for num_threads(gc_threads) reduction(+:nc) private(s, t) schedule(static)
for (i = 0; i < LENGTH(R_StringHash); i++) {
s = VECTOR_ELT(R_StringHash, i);
t = R_NilValue;
@@ -1827,8 +1857,6 @@
s = CXTAIL(s);
continue;
}
- FORWARD_NODE(s);
- FORWARD_NODE(CXHEAD(s));
t = s;
s = CXTAIL(s);
}
@@ -2124,6 +2152,7 @@
init_gctorture();
init_gc_grow_settings();
+ init_gc_threads();
arg = getenv("_R_GC_FAIL_ON_ERROR_");
if (arg != NULL && StringTrue(arg))
@@ -2215,6 +2244,8 @@
R_LogicalNAValue = allocVector(LGLSXP, 1);
LOGICAL(R_LogicalNAValue)[0] = NA_LOGICAL;
MARK_NOT_MUTABLE(R_LogicalNAValue);
+
+ avoid_openmp_hang_within_fork();
}
/* Since memory allocated from the heap is non-moving, R_alloc just
Index: src/main/memory.c
===================================================================
--- src/main/memory.c (revision 81008)
+++ src/main/memory.c (working copy)
@@ -90,6 +90,7 @@
#include <R_ext/Rallocators.h> /* for R_allocator_t structure */
#include <Rmath.h> // R_pow_di
#include <Print.h> // R_print
+#include <pthread.h> // pthread_atfork
/* malloc uses size_t. We are assuming here that size_t is at least
as large as unsigned long. Changed from int at 1.6.0 to (i) allow
@@ -100,6 +101,22 @@
static int gc_reporting = 0;
static int gc_count = 0;
+static int gc_threads = 4;
+static int pre_fork_gc_threads;
+
+void when_fork() {
+ pre_fork_gc_threads = gc_threads;
+ gc_threads = 1;
+}
+
+void after_fork() {
+ gc_threads = pre_fork_gc_threads;
+}
+
+void avoid_openmp_hang_within_fork() {
+ pthread_atfork(&when_fork, &after_fork, NULL);
+}
+
/* Report error encountered during garbage collection where for detecting
problems it is better to abort, but for debugging (or some production runs,
where external validation of results is possible) it may be preferred to
@@ -406,6 +423,18 @@
}
}
+static void init_gc_threads()
+{
+ char *arg;
+
+ arg = getenv("R_GC_THREADS");
+
+ if (arg != NULL) {
+ int threads = (int) atof(arg);
+ if (threads >= 1) gc_threads = threads;
+ }
+}
+
/* Maximal Heap Limits. These variables contain upper limits on the
heap sizes. They could be made adjustable from the R level,
perhaps by a handler for a recoverable error.
@@ -1815,6 +1844,7 @@
{
SEXP t;
int nc = 0;
+#pragma omp parallel for num_threads(gc_threads) reduction(+:nc) private(s, t) schedule(static)
for (i = 0; i < LENGTH(R_StringHash); i++) {
s = VECTOR_ELT(R_StringHash, i);
t = R_NilValue;
@@ -1827,8 +1857,6 @@
s = CXTAIL(s);
continue;
}
- FORWARD_NODE(s);
- FORWARD_NODE(CXHEAD(s));
t = s;
s = CXTAIL(s);
}
@@ -1835,9 +1863,17 @@
if(VECTOR_ELT(R_StringHash, i) != R_NilValue) nc++;
}
SET_TRUELENGTH(R_StringHash, nc); /* SET_HASHPRI, really */
+
+ /* instead of 'FORWARD_NODE(R_StringHash); PROCESS_NODES();' to avoid
+ iterating over all its elements again which are all already marked*/
+ if (! NODE_IS_MARKED(R_StringHash)) {
+ CHECK_FOR_FREE_NODE(R_StringHash)
+ MARK_NODE(R_StringHash);
+ UNSNAP_NODE(R_StringHash);
+ SNAP_NODE(R_StringHash, R_GenHeap[NODE_CLASS(R_StringHash)].Old[NODE_GENERATION(R_StringHash)]);
+ R_GenHeap[NODE_CLASS(R_StringHash)].OldCount[NODE_GENERATION(R_StringHash)]++;
}
- FORWARD_NODE(R_StringHash);
- PROCESS_NODES();
+ }
#ifdef PROTECTCHECK
for(i=0; i< NUM_SMALL_NODE_CLASSES;i++){
@@ -2124,6 +2160,7 @@
init_gctorture();
init_gc_grow_settings();
+ init_gc_threads();
arg = getenv("_R_GC_FAIL_ON_ERROR_");
if (arg != NULL && StringTrue(arg))
@@ -2215,6 +2252,8 @@
R_LogicalNAValue = allocVector(LGLSXP, 1);
LOGICAL(R_LogicalNAValue)[0] = NA_LOGICAL;
MARK_NOT_MUTABLE(R_LogicalNAValue);
+
+ avoid_openmp_hang_within_fork();
}
/* Since memory allocated from the heap is non-moving, R_alloc just
______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel