Dear R-Devel List, **TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].
[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792 ## The Call Chain First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`: 1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB** 2. **splitList:** clusterApply.R:157 3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply** 4. **dynamicClusterApply:** clusterApply.R:39 ## splitList We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5 workers. First, lets take a look at **splitList**: ```r > sapply(parallel:::splitList(1:100, 5), length) [1] 20 20 20 20 20 > sapply(parallel:::splitList(1:97, 5), length) [1] 20 19 19 19 20 > sapply(parallel:::splitList(1:97, 20), length) [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5 ``` As we can see in the examples, the work is distributed as equally as possible. ## dynamicClusterApply **dynamicClusterApply** works this way (simplified): 1. it first gives a chunk to each worker 2. once a worker comes back with the result, it is given the next chunk **This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing. ## parLapplyLB This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability): ```r parLapplyLB <- function(cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) chunks <- splitList(X, length(cl)) do.call(c, clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), quote = TRUE) } ``` For our examples, the chunks have these sizes: ```r > sapply(parallel:::splitList(1:100, 5), length) [1] 20 20 20 20 20 ``` There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because each worker is given a single chunk and then it stops working because there are no more chunks. Instead, **parLapplyLB** should look like this (patch is attached): ```r parLapplyLB <- function(cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) chunks <- splitList(X, chunkSize) do.call(c, clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), quote = TRUE) } ``` Examples with a cluster of 5 workers: ```r # length(cl) < length(X) > sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length) [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 # length(cl) >= length(X) > sapply(parallel:::splitList(1:4, 4), length) [1] 1 1 1 1 # one worker idles here, but we can't do better than that ``` With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing should work. Best Regards -- Christian Krause Scientific Computing Administration and Support ------------------------------------------------------------------------------------------------------------------------ Phone: +49 341 97 33144 Email: christian.kra...@idiv.de ------------------------------------------------------------------------------------------------------------------------ German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig Deutscher Platz 5e 04103 Leipzig Germany ------------------------------------------------------------------------------------------------------------------------ iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 Abs. 1 SächsHSFG und wird zusammen mit der Martin-Luther-Universität Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für chemische Ökologie (MPI CE), das Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383
Index: src/library/parallel/R/clusterApply.R =================================================================== --- src/library/parallel/R/clusterApply.R (revision 74246) +++ src/library/parallel/R/clusterApply.R (working copy) @@ -177,9 +177,13 @@ parLapplyLB <- function(cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) + + chunkSize <- max(length(cl), ceiling(length(X) / length(cl))) + + chunks <- splitList(X, chunkSize) + do.call(c, - clusterApplyLB(cl, x = splitList(X, length(cl)), - fun = lapply, fun, ...), + clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...), quote = TRUE) }
______________________________________________ R-devel@r-project.org mailing list https://stat.ethz.ch/mailman/listinfo/r-devel