Dear R-Devel List,

**TL;DR:** The function **parLapplyLB** of the parallel package has 
[reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk 
Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been 
provided [here][2].

[1]: 
https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792


## The Call Chain

First, we traced the relevant R function calls through the code, beginning with 
`parLapplyLB`:

1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then 
**clusterApplyLB**
2.  **splitList:** clusterApply.R:157
3.  **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4.  **dynamicClusterApply:** clusterApply.R:39


## splitList

We used both our whiteboard and an R session to manually *run* a few examples. 
We were using lists of 100 elements and 5
workers. First, lets take a look at **splitList**:

```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20

> sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20

> sapply(parallel:::splitList(1:97, 20), length)
 [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```

As we can see in the examples, the work is distributed as equally as possible.


## dynamicClusterApply

**dynamicClusterApply** works this way (simplified):

1.  it first gives a chunk to each worker
2.  once a worker comes back with the result, it is given the next chunk

**This is the important part:** As long as there are **more** chunks than 
workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** 
and there is **no** load balancing.


## parLapplyLB

This is how **parLapplyLB** splits the input list (with a bit of refactoring, 
for readability):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
    cl <- defaultCluster(cl)

    chunks <- splitList(X, length(cl))

    do.call(c,
            clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
            quote = TRUE)
}
```

For our examples, the chunks have these sizes:

```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```

There we have it: 5 chunks. 5 workers. With this work distribution, there can't 
possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are 
no more chunks.

Instead, **parLapplyLB** should look like this (patch is attached):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
    cl <- defaultCluster(cl)

    chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))

    chunks <- splitList(X, chunkSize)

    do.call(c,
            clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
            quote = TRUE)
}
```

Examples with a cluster of 5 workers:

```r
# length(cl) < length(X)
> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
 [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5

# length(cl) >= length(X)
> sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```

With this patch, the number of chunks is larger than the number of workers, if 
possible at all, and then load balancing
should work.

Best Regards

-- 

Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------

Phone: +49 341 97 33144

Email: christian.kra...@idiv.de

------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 
Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der 
Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation
mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte 
Kooperationspartner sind die folgenden
außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für 
Umweltforschung GmbH - UFZ, das
Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für 
chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das 
Leibniz-Institut Deutsche Sammlung von Mikroorganismen
und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das 
Leibniz-Institut für Pflanzengenetik und
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für 
Naturkunde Görlitz (SMNG). USt-IdNr. DE
141510383

Index: src/library/parallel/R/clusterApply.R
===================================================================
--- src/library/parallel/R/clusterApply.R	(revision 74246)
+++ src/library/parallel/R/clusterApply.R	(working copy)
@@ -177,9 +177,13 @@
 parLapplyLB <- function(cl = NULL, X, fun, ...)
 {
     cl <- defaultCluster(cl)
+
+    chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
+
+    chunks <- splitList(X, chunkSize)
+
     do.call(c,
-            clusterApplyLB(cl, x = splitList(X, length(cl)),
-                           fun = lapply, fun, ...),
+            clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
             quote = TRUE)
 }
 
______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel

Reply via email to