On Fri, 8 Mar 2024, Ivan Krylov via R-help wrote:
Hello R-help,
I've noticed that my 'parallel' jobs take too much memory to store and
transfer to the cluster workers. I've managed to trace it to the
following:
# `payload` is being written to the cluster worker.
# The function FUN had been created as a closure inside my package:
payload$data$args$FUN
# function (l, ...)
# withCallingHandlers(fun(l$x, ...), error = .wraperr(l$name))
#
#
# The function seems to bring a lot of captured data with it.
e <- environment(payload$data$args$FUN)
length(serialize(e, NULL))
# [1] 738202878
parent.env(e)
#
# The parent environment has a name, so it all must be right here.
# What is it?
ls(e, all.names = TRUE)
# [1] "fun"
length(serialize(e$fun, NULL))
# [1] 317
# The only object in the environment is small!
# Where is the 700 megabytes of data?
length(serialize(e, NULL))
# [1] 536
length(serialize(payload$data$args$FUN, NULL))
# [1] 1722
And once I've observed `fun`, the environment becomes very small and
now can be serialized in a very compact manner.
I managed to work around it by forcing the promise and explicitly
putting `fun` in a small environment when constructing the closure:
.wrapfun <- function(fun) {
e <- new.env(parent = loadNamespace('mypackage'))
e$fun <- fun
# NOTE: a naive return(function(...)) could serialize to 700
# megabytes due to `fun` seemingly being a promise (?). Once the
# promise is resolved, suddenly `fun` is much more compact.
ret <- function(l, ...) withCallingHandlers(
fun(l$x, ...),
error = .wraperr(l$name)
)
environment(ret) <- e
ret
}
Creating and setting environments is brittle and easy to get wrong. I
prefer to use a combination of proper lexical scoping, regular
assignments, and force() as I do below.
Is this analysis correct? Could a simple f <- force(fun) have sufficed?
Where can I read more about this type of problems?
Just force(fun), without the assignment, should be enough, or even
just fun, as in
function(fun) { fun; }
Using force() make the intent clearer.
Closures or formulas capturing large amount of data is something you
have to be careful about with serialization in general and distributed
memory computing in R in particular. There is a little on it in the
parallel vignette. I know I have talked and written about it in
various places but can't remember a specific reference right now.
I usually define a top level function to create any closures I want to
transmit and make sure they only capture what they need. A common
pattern is provided by a simple function for creating a normal
log-likelihood:
mkLL <- function(x) {
m <- mean(x)
s <- sd(x)
function(y) sum(dnorm(y, m, s, log = TRUE))
}
This avoids recomputing the mean and sd on every call. It is fine for
use within a single process, and the fact that the original data is
available in the environment might even be useful for debugging:
ll <- mkLL(rnorm(10))
environment(ll)$x
## [1] -0.09202453 0.78901912 -0.66744232 1.36061149 1.50768816
## [6] -2.60754997 0.68727212 0.31557476 2.02027688 -1.42361769
But it does prevent the data from being garbage-collected until
the returned result is no longer reachable. A more GC-friendly, and
serialization-friendly definition is
mkLL <- function(x) {
m <- mean(x)
s <- sd(x)
x <- NULL ## not needed anymore; remove from the result's enclosing env
function(y) sum(dnorm(y, m, s, log = TRUE))
}
ll <- mkLL(rnorm(1e7))
length(serialize(ll, NULL))
## [1] 734
If you prefer to calculate the mean and sd yourself you could use
mkLL1 <- function(m, s) function(x) sum(dnorm(x, m, s, log = TRUE))
Until the result is called for the first time the evaluation of the
arguments will be delayed, i.e. encoded in promises that record the
expression to evaluate and the environment in which to evaluate
it:
f <- function(n) {
x <- rnorm(n)
mkLL1(mean(x), sd(x))
}
ll <- f(1e7)
length(serialize(ll, NULL))
## [1] 80002223
Once the arguments are evaluated, the expressions are still needed for
substitute() and such, but the environment is not, so it is dropped,
and if the promise environment can no longer be reached it can be
garbage-collected, It will also no longer appear in a serialization:
ll(1)
## [1] -1.419588
length(serialize(ll, NULL))
## [1] 3537
Having a reference to a large environment is not much of an issue
within a single process, but can be in a distributed memory parallel
computing context. To avoid this you can force evaluation of the
promises:
mkLL1 <- function(m, s) {
force(m)
force(s)
function(x) sum(dnorm(x, m, s, log = TRUE))
}
ll <- f(1e7)
length(serialize(ll, NULL))
## [1] 2146
The possibility of inadvertently transferring too much data is an
issue in distributed memory computing in general, so there are various
tools that help. A very