Dear Tomas,

Thanks for your commitment to fix this issue and also to add the chunk size as 
an argument. If you want our input, let us know ;)

Best Regards

On 02/26/2018 04:01 PM, Tomas Kalibera wrote:
> Dear Christian and Henrik,
> 
> thank you for spotting the problem and suggestions for a fix. We'll probably 
> add a chunk.size argument to parLapplyLB and parLapply to follow OpenMP 
> terminology, which has already been an inspiration for the present code 
> (parLapply already implements static scheduling via internal function 
> staticClusterApply, yet with a fixed chunk size; parLapplyLB already 
> implements dynamic scheduling via internal function dynamicClusterApply, but 
> with a fixed chunk size set to an unlucky value so that it behaves like 
> static scheduling). The default chunk size for parallelLapplyLB will be set 
> so that there is some dynamism in the schedule even by default. I am now 
> testing a patch with these changes.
> 
> Best
> Tomas
> 
> 
> On 02/20/2018 11:45 AM, Christian Krause wrote:
>> Dear Henrik,
>>
>> The rationale is just that it is within these extremes and that it is really 
>> simple to calculate, without making any assumptions and knowing that it 
>> won't be perfect.
>>
>> The extremes A and B you are mentioning are special cases based on 
>> assumptions. Case A is based on the assumption that the function has a long 
>> runtime or varying runtime, then you are likely to get the best load 
>> balancing with really small chunks. Case B is based on the assumption that 
>> the function runtime is the same for each list element, i.e. where you don't 
>> actually need load balancing, i.e. just use `parLapply` without load 
>> balancing.
>>
>> This new default is **not the best one**. It's just a better one than we had 
>> before. There is no best one we can use as default because **we don't know 
>> the function runtime and how it varies**. The user needs to decide that 
>> because he/she knows the function. As mentioned before, I will write a patch 
>> that makes the chunk size an optional argument, so the user can decide 
>> because only he/she has all the information to choose the best chunk size, 
>> just like you did with the `future.scheduling` parameter.
>>
>> Best Regards
>>
>> On February 19, 2018 10:11:04 PM GMT+01:00, Henrik Bengtsson 
>> <henrik.bengts...@gmail.com> wrote:
>>> Hi, I'm trying to understand the rationale for your proposed amount of
>>> splitting and more precisely why that one is THE one.
>>>
>>> If I put labels on your example numbers in one of your previous post:
>>>
>>> nbrOfElements <- 97
>>> nbrOfWorkers <- 5
>>>
>>> With these, there are two extremes in how you can split up the
>>> processing in chunks such that all workers are utilized:
>>>
>>> (A) Each worker, called multiple times, processes one element each
>>> time:
>>>
>>>> nbrOfElements <- 97
>>>> nbrOfWorkers <- 5
>>>> nbrOfChunks <- nbrOfElements
>>>> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
>>> [1] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
>>> [30] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
>>> [59] 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
>>> [88] 1 1 1 1 1 1 1 1 1 1
>>>
>>>
>>> (B) Each worker, called once, processes multiple element:
>>>
>>>> nbrOfElements <- 97
>>>> nbrOfWorkers <- 5
>>>> nbrOfChunks <- nbrOfWorkers
>>>> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
>>> [1] 20 19 19 19 20
>>>
>>> I understand that neither of these two extremes may be the best when
>>> it comes to orchestration overhead and load balancing. Instead, the
>>> best might be somewhere in-between, e.g.
>>>
>>> (C) Each worker, called multiple times, processing multiple elements:
>>>
>>>> nbrOfElements <- 97
>>>> nbrOfWorkers <- 5
>>>> nbrOfChunks <- nbrOfElements / nbrOfWorkers
>>>> sapply(parallel:::splitList(1:nbrOfElements, nbrOfChunks), length)
>>> [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
>>>
>>> However, there are multiple alternatives between the two extremes, e.g.
>>>
>>>> nbrOfChunks <- scale * nbrOfElements / nbrOfWorkers
>>> So, is there a reason why you argue for scale = 1.0 to be the optimal?
>>>
>>> FYI, In future.apply::future_lapply(X, FUN, ...) there is a
>>> 'future.scheduling' scale factor(*) argument where default
>>> future.scheduling = 1 corresponds to (B) and future.scheduling = +Inf
>>> to (A).  Using future.scheduling = 4 achieves the amount of
>>> load-balancing you propose in (C).   (*) Different definition from the
>>> above 'scale'. (Disclaimer: I'm the author)
>>>
>>> /Henrik
>>>
>>> On Mon, Feb 19, 2018 at 10:21 AM, Christian Krause
>>> <christian.kra...@idiv.de> wrote:
>>>> Dear R-Devel List,
>>>>
>>>> I have installed R 3.4.3 with the patch applied on our cluster and
>>> ran a *real-world* job of one of our users to confirm that the patch
>>> works to my satisfaction. Here are the results.
>>>> The original was a series of jobs, all essentially doing the same
>>> stuff using bootstrapped data, so for the original there is more data
>>> and I show the arithmetic mean with standard deviation. The
>>> confirmation with the patched R was only a single instance of that
>>> series of jobs.
>>>> ## Job Efficiency
>>>>
>>>> The job efficiency is defined as (this is what the `qacct-efficiency`
>>> tool below does):
>>>> ```
>>>> efficiency = cputime / cores / wallclocktime * 100%
>>>> ```
>>>>
>>>> In simpler words: how well did the job utilize its CPU cores. It
>>> shows the percentage of time the job was actually doing stuff, as
>>> opposed to the difference:
>>>> ```
>>>> wasted = 100% - efficiency
>>>> ```
>>>>
>>>> ... which, essentially, tells us how much of the resources were
>>> wasted, i.e. CPU cores just idling, without being used by anyone. We
>>> care a lot about that because, for our scientific computing cluster,
>>> wasted resources is like burning money.
>>>> ### original
>>>>
>>>> This is the entire series from our job accounting database, filteres
>>> the successful jobs, calculates efficiency and then shows the average
>>> and standard deviation of the efficiency:
>>>> ```
>>>> $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd
>>>> n=945 ∅ 61.7276 ± 7.78719
>>>> ```
>>>>
>>>> This is the entire series from our job accounting database, filteres
>>> the successful jobs, calculates efficiency and does sort of a
>>> histogram-like binning before calculation of mean and standard
>>> deviation (to get a more detailed impression of the distribution when
>>> standard deviation of the previous command is comparatively high):
>>>> ```
>>>> $ qacct -j 4433299 | qacct-success | qacct-efficiency | meansd-bin -w
>>> 10 | sort -gk1 | column -t
>>>> 10  -  20  ->  n=3    ∅  19.21666666666667   ±  0.9112811494447459
>>>> 20  -  30  ->  n=6    ∅  26.418333333333333  ±  2.665996374091058
>>>> 30  -  40  ->  n=12   ∅  35.11583333333334   ±  2.8575783082671196
>>>> 40  -  50  ->  n=14   ∅  45.35285714285715   ±  2.98623361591005
>>>> 50  -  60  ->  n=344  ∅  57.114593023255814  ±  2.1922005551774415
>>>> 60  -  70  ->  n=453  ∅  64.29536423841049   ±  2.8334788433963856
>>>> 70  -  80  ->  n=108  ∅  72.95592592592598   ±  2.5219474143639276
>>>> 80  -  90  ->  n=5    ∅  81.526              ±  1.2802265424525452
>>>> ```
>>>>
>>>> I have attached an example graph from our monitoring system of a
>>> single instance in my previous mail. There you can see that the load
>>> balancing does not actually work, i.e. same as `parLapply`. This
>>> reflects in the job efficiency.
>>>> ### patch applied
>>>>
>>>> This is the single instance I used to confirm that the patch works:
>>>>
>>>> ```
>>>> $ qacct -j 4562202 | qacct-efficiency
>>>> 97.36
>>>> ```
>>>>
>>>> The graph from our monitoring system is attached. As you can see, the
>>> load balancing works to a satisfying degree and the efficiency is well
>>> above 90% which was what I had hoped for :-)
>>>> ## Additional Notes
>>>>
>>>> The list used in this jobs `parLapplyLB` is 5812 elements long. With
>>> the `splitList`-chunking from the patch, you'll get 208 lists of about
>>> 28 elements (208 chunks of size 28). The job ran on 28 CPU cores and
>>> had a wallclock time of 120351.590 seconds, i.e. 33.43 hours. Thus, the
>>> function we apply to our list takes about 580 seconds per list element,
>>> i.e. about 10 minutes. I suppose, for that runtime, we would get even
>>> better load balancing if we would reduce the chunk size even further,
>>> maybe even down to 1, thus getting our efficiency even closer to 100%.
>>>> Of course, for really short-running functions, a higher chunk size
>>> may be more efficient because of the overhead. In our case, the
>>> overhead is negligible and that is why the low chunk size works really
>>> well. In contrast, for smallish lists with short-running functions, you
>>> might not even need load balancing and `parLapply` suffices. It only
>>> becomes an issue, when the runtime of the function is high and / or
>>> varying.
>>>> In our case, the entire runtime of the entire series of jobs was:
>>>>
>>>> ```
>>>> $ qacct -j 4433299 | awk '$1 == "wallclock" { sum += $2 } END { print
>>> sum, "seconds" }'
>>>> 4.72439e+09 seconds
>>>> ```
>>>>
>>>> Thats about 150 years on a single core or 7.5 years on a 20 core
>>> server! Our user was constantly using about 500 cores, so this took
>>> about 110 days. If you compare this to my 97% efficiency example, the
>>> jobs could have been finished in 75 days instead ;-)
>>>> ## Upcoming Patch
>>>>
>>>> If this patch gets applied to the R code base (and I hope it will
>>> :-)) my colleague and I will submit another patch that adds the chunk
>>> size as an optional parameter to all off the load balancing functions.
>>> With that parameter, users of these functions *can* decide for
>>> themselves which chunk size they prefer for their code. As mentioned
>>> before, the most efficient chunk size depends on the used functions
>>> runtime, which is the only thing R does not know and users really
>>> should be allowed to specify explicitly. The default of this new
>>> optional parameter would be the one we used here and this would make
>>> that upcoming patch fully source-compatible.
>>>> Best Regards
>>>>
>>>> On 02/12/2018 08:08 PM, Christian Krause wrote:
>>>>> Dear R-Devel List,
>>>>>
>>>>> **TL;DR:** The function **parLapplyLB** of the parallel package has
>>> [reportedly][1] (see also attached RRD output) not
>>>>> been doing its job, i.e. not actually balancing the load. My
>>> colleague Dirk Sarpe and I found the cause of the problem
>>>>> and we also have a patch to fix it (attached). A similar fix has
>>> also been provided [here][2].
>>>>> [1]:
>>> https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
>>>>> [2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
>>>>>
>>>>>
>>>>> ## The Call Chain
>>>>>
>>>>> First, we traced the relevant R function calls through the code,
>>> beginning with `parLapplyLB`:
>>>>> 1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then
>>> **clusterApplyLB**
>>>>> 2.  **splitList:** clusterApply.R:157
>>>>> 3.  **clusterApplyLB:** clusterApply.R:87, calls
>>> **dynamicClusterApply**
>>>>> 4.  **dynamicClusterApply:** clusterApply.R:39
>>>>>
>>>>>
>>>>> ## splitList
>>>>>
>>>>> We used both our whiteboard and an R session to manually *run* a few
>>> examples. We were using lists of 100 elements and 5
>>>>> workers. First, lets take a look at **splitList**:
>>>>>
>>>>> ```r
>>>>>> sapply(parallel:::splitList(1:100, 5), length)
>>>>> [1] 20 20 20 20 20
>>>>>
>>>>>> sapply(parallel:::splitList(1:97, 5), length)
>>>>> [1] 20 19 19 19 20
>>>>>
>>>>>> sapply(parallel:::splitList(1:97, 20), length)
>>>>>   [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
>>>>> ```
>>>>>
>>>>> As we can see in the examples, the work is distributed as equally as
>>> possible.
>>>>>
>>>>> ## dynamicClusterApply
>>>>>
>>>>> **dynamicClusterApply** works this way (simplified):
>>>>>
>>>>> 1.  it first gives a chunk to each worker
>>>>> 2.  once a worker comes back with the result, it is given the next
>>> chunk
>>>>> **This is the important part:** As long as there are **more** chunks
>>> than workers, there will be load balancing. If
>>>>> there are fewer chunks than workers, each worker will get **at most
>>> one chunk** and there is **no** load balancing.
>>>>>
>>>>> ## parLapplyLB
>>>>>
>>>>> This is how **parLapplyLB** splits the input list (with a bit of
>>> refactoring, for readability):
>>>>> ```r
>>>>> parLapplyLB <- function(cl = NULL, X, fun, ...)
>>>>> {
>>>>>      cl <- defaultCluster(cl)
>>>>>
>>>>>      chunks <- splitList(X, length(cl))
>>>>>
>>>>>      do.call(c,
>>>>>              clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
>>>>>              quote = TRUE)
>>>>> }
>>>>> ```
>>>>>
>>>>> For our examples, the chunks have these sizes:
>>>>>
>>>>> ```r
>>>>>> sapply(parallel:::splitList(1:100, 5), length)
>>>>> [1] 20 20 20 20 20
>>>>> ```
>>>>>
>>>>> There we have it: 5 chunks. 5 workers. With this work distribution,
>>> there can't possibly be any load balancing, because
>>>>> each worker is given a single chunk and then it stops working
>>> because there are no more chunks.
>>>>> Instead, **parLapplyLB** should look like this (patch is attached):
>>>>>
>>>>> ```r
>>>>> parLapplyLB <- function(cl = NULL, X, fun, ...)
>>>>> {
>>>>>      cl <- defaultCluster(cl)
>>>>>
>>>>>      chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
>>>>>
>>>>>      chunks <- splitList(X, chunkSize)
>>>>>
>>>>>      do.call(c,
>>>>>              clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
>>>>>              quote = TRUE)
>>>>> }
>>>>> ```
>>>>>
>>>>> Examples with a cluster of 5 workers:
>>>>>
>>>>> ```r
>>>>> # length(cl) < length(X)
>>>>>> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
>>>>>   [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
>>>>>
>>>>> # length(cl) >= length(X)
>>>>>> sapply(parallel:::splitList(1:4, 4), length)
>>>>> [1] 1 1 1 1
>>>>> # one worker idles here, but we can't do better than that
>>>>> ```
>>>>>
>>>>> With this patch, the number of chunks is larger than the number of
>>> workers, if possible at all, and then load balancing
>>>>> should work.
>>>>>
>>>>> Best Regards
>>>>>
>>>>>
>>>>>
>>>>> ______________________________________________
>>>>> R-devel@r-project.org mailing list
>>>>> https://stat.ethz.ch/mailman/listinfo/r-devel
>>>>>
>>>> -- 
>>>> Christian Krause
>>>>
>>>> Scientific Computing Administration and Support
>>>>
>>>>
>>> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> Email: christian.kra...@idiv.de
>>>>
>>>> Office: BioCity Leipzig 5e, Room 3.201.3
>>>>
>>>> Phone: +49 341 97 33144
>>>>
>>>>
>>> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> German Centre for Integrative Biodiversity Research (iDiv)
>>> Halle-Jena-Leipzig
>>>> Deutscher Platz 5e
>>>>
>>>> 04103 Leipzig
>>>>
>>>> Germany
>>>>
>>>>
>>> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>> iDiv is a research centre of the DFG – Deutsche
>>> Forschungsgemeinschaft
>>>> iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne
>>> des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
>>> Martin-Luther-Universität Halle-Wittenberg und der
>>> Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation mit
>>> dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte
>>> Kooperationspartner sind die folgenden außeruniversitären
>>> Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH
>>> - UFZ, das Max-Planck-Institut für Biogeochemie (MPI BGC), das
>>> Max-Planck-Institut für chemische Ökologie (MPI CE), das
>>> Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das
>>> Leibniz-Institut Deutsche Sammlung von Mikroorganismen und Zellkulturen
>>> (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das
>>> Leibniz-Institut für Pflanzengenetik und Kulturpflanzenforschung (IPK)
>>> und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz
>>> (SMNG). USt-IdNr. DE 141510383
>>>>
>>>> ______________________________________________
>>>> R-devel@r-project.org mailing list
>>>> https://stat.ethz.ch/mailman/listinfo/r-devel
>>>>
> 
> 

-- 
Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Email: christian.kra...@idiv.de

Office: BioCity Leipzig 5e, Room 3.201.3

Phone: +49 341 97 33144

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 
Abs. 1 SächsHSFG und wird zusammen mit der Martin-Luther-Universität 
Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie in 
Kooperation mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. 
Beteiligte Kooperationspartner sind die folgenden außeruniversitären 
Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - UFZ, 
das Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für 
chemische Ökologie (MPI CE), das Max-Planck-Institut für evolutionäre 
Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von 
Mikroorganismen und Zellkulturen (DSMZ), das Leibniz-Institut für 
Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und 
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für 
Naturkunde Görlitz (SMNG). USt-IdNr. DE 141510383


Attachment: signature.asc
Description: OpenPGP digital signature

______________________________________________
R-devel@r-project.org mailing list
https://stat.ethz.ch/mailman/listinfo/r-devel

Reply via email to