There are many nuances like container affinity etc. but broadly this is what happens.
The scheduler loops through all free containers. For each container, at node/rack/any level of locality, it picks the current top priority of tasks that needs to be assigned. For that priority it asks YARN AMRMClient to return pending requests at given locality. These pending requests are returned in the order they were received. Thus tasks of same priority from different vertices would be received intermingled in the order in which they arrived. Bikas From: Fabio C. [mailto:[email protected]] Sent: Friday, January 30, 2015 5:52 AM To: [email protected] Subject: Re: Questions about Tez under the hood Thanks a lot. Now I was trying to figure out what happens if there are different tasks from two different vertexes at the same priority awaiting for resources. If a suitable container becomes available, who is going to get it? I'm having a hard time finding the code about this. I was thinking that maybe we serve at first the first submitted vertex and among its tasks we follow the task id order, but it's just a guess. On Wed, Jan 28, 2015 at 10:50 PM, Hitesh Shah <[email protected]<mailto:[email protected]>> wrote: Answers inline. — Hitesh On Jan 28, 2015, at 3:15 AM, Fabio <[email protected]<mailto:[email protected]>> wrote: > Hi everyone, > I take back this mail since I have a few more questions about Tez. I am > digging into the internal scheduling policy and I'm trying to fully > understand how containers are assigned once Tez receive them from the RM. > I am mainly referring to org.apache.tez.dag.app.rm.YarnTaskSchedulerService > (I am currently on 0.5.0, I hope there have been no change) and I am not > considering locality (let's say I just have one node in the cluster). > > Could someone please confirm this? > - When a response comes from the RM with container allocations for the > application, those containers are added to the list of delayed containers > (together with old containers already available for reuse), and their > scheduling time is set to be 1 ms after the last scheduling time seen so far. > They are assigned right away only if container reuse is disabled. Yes. With container reuse disabled, there should be a 1:1 mapping for a pending task to an allocated container. With re-use enabled, existing containers are used first to reduce launch cost overheads. > - The tez scheduler will keep on trying to assign a container at its > nextScheduleTime. Yes - as well as whenever a new pending task comes in, it will try and assign a container to it. > - If we've just got any container from the RM, then the tez scheduler will > try to assign all delayed containers (old ones first and new ones last, since > they are ordered according to their next scheduling time) Yes - probably something which needs to be fixed. This was being done earlier before when a new container would be allocated first instead of a re-used container. This may no longer be needed as the scheduling loop would get trigged on the next schedule time elapse. This probably unintentionally acts as a trigger to run a new matching loop. > Just out of curiosity, why not ordering the delayed containers according to > their expiry time? The general flow is that on each loop, a container is tried to be assigned to a potential matching task. On each loop, its locality constraints are relaxed more or more to allow for more matches ( local only on round 1, rack or local match on round 2, …). By the time, the container hits its final loop, it will match against any pending task. Sorting by next schedule time means the thread wakes up when it is time to run the next loop for a given container. Sorting by expiry would imply scanning the whole list to find all containers whose schedule time has elapsed. > > Thanks > > Fabio > > On 10/29/2014 04:48 PM, Hitesh Shah wrote: >> Answers inline. >> >> — Hitesh >> >> On Oct 29, 2014, at 2:33 AM, Fabio >> <[email protected]<mailto:[email protected]>> wrote: >> >>> Thanks Bikas for your answer and suggestion, actually my work deals more >>> with high level modeling/behavior/performance of Tez, but there is another >>> guy who is goign to handle Tez sources, I will suggest him to contribute. >>> I've just found many commented configuration parameters in >>> org.apache.tez.dag.api.TezConfiguration that I didn't even know, they will >>> help. >>> >>> Right now I have another question that came to my mind while modeling Tez: >>> Situation: I have a DAG with 2 tasks waiting to run, the cluster is quite >>> overloaded. The Tez AM will ask for 2 containers at the Resource Manager >>> and wait for them. At some point a single container becomes available and a >>> task can run and finish, so Tez (I guess) will exploit that same container >>> for reuse, but what about the other request sent to the RM? Is it somehow >>> actively voided by Tez or at some point it will just get another container >>> that wont be used (and possibly discarded afterward)? I don't even know if >>> YARN have such a feature for removing a previously submitted request to the >>> RM. >>> >> [Hitesh] Tez will always ask the RM for as many containers as the tasks it >> needs to run. In cases when a task is scheduled to run on an existing >> available container, it will do so based on certain conditions such as >> checking if the data needed by the task is available on the same node and/or >> rack as that of the existing container. >> >> In terms of the RM request management, the protocol between the RM and an >> ApplicationMaster is more or less an update protocol ( and not an >> incremental one ). Based on your example, Tez would first ask the RM for 2 >> containers. Once it gets one, it will keep on telling the RM that it now >> needs one. If the previously assigned container is also used for the 2nd >> task, it will update the ask to the RM to 0 containers. There is obviously a >> minor race condition where the RM may have already allocated the container >> before Tez is able to tell it that it does not need the additional >> container. In such cases, Tez will get an additional allocation which it >> does not need but release it in due time ( the YARN protocol supports >> releasing containers without using them ). >> >> >>> I would keep this thread for future generic questions about Tez behavior if >>> it's ok. >>> >>> Thanks so far :) >>> >>> Fabio >>> >>> On 10/27/2014 05:48 PM, Bikas Saha wrote: >>>> Also, any contributions to the project via your thesis work would be >>>> welcome. Please do first open a jira and provide a design overview before >>>> submitting code. >>>> From: Bikas Saha >>>> [mailto:[email protected]<mailto:[email protected]>] >>>> Sent: Monday, October 27, 2014 9:47 AM >>>> To: [email protected]<mailto:[email protected]> >>>> Subject: RE: Questions about Tez under the hood >>>> Answers inline. >>>> From: Fabio C. [mailto:[email protected]<mailto:[email protected]>] >>>> Sent: Monday, October 27, 2014 7:08 AM >>>> To: [email protected]<mailto:[email protected]> >>>> Subject: Questions about Tez under the hood >>>> Hi guys, I'm currently working at my master degree thesis on Tez, and I >>>> am trying to understand how Tez works under the hood. I have some >>>> questions, I hope someone can help with this: >>>> >>>> 1) How does Tez handle containers for reuse? Are they kept for some >>>> seconds (how long?) in a sort of buffer waiting for tasks which will need >>>> them? Or a container is sent back to the RM if no task is immediately >>>> ready to take it? >>>> >>>> [Bikas] Yes they wait around for a buffer period of time. Idle containers >>>> are released back the RM randomly between a mix and a max release time >>>> until a minimum held container threshold is met. So the behavior can be >>>> customized using the min/max timeouts and the min held threshold. >>>> >>>> 2) Let's say I have a DAG with two branches proceeding in parallel before >>>> joining in a root node (such as the example on the tez home >>>> pagehttp://tez.apache.org/images/PigHiveQueryOnTez.png<http://cp.mcafee.com/d/5fHCN0gdEI9FK9LCzBBYTsSztNWXX3bb1J6XxEVosphhjdETsuK-MOO-rhKUqekmkkhNEVdA95tE2ytoPH0Nm9mDbCOtoPH0Nm9mDbC_uIYgIM_R-uuosvLRXBQhPMXAnbL3CjhPORQX8FGTKVOEuvkzaT0QSyrhdTVdByX2rXXapKVI07fNjJmSNf-00V9nifQMbi8xuhDM7r-9ocg1fyDbg8CSWv4L7VJNwn76zAsn8iaXgUDmcWMclylFOUaDUFSHroD_00jqrzPapI5-Aq807r-AVlxgQgqgGTcQg0DNgQgdMg-9EwCjYQg4WRgdIL6Pq4mF8lDLDck> >>>> ). In this case, we will have both branches running at the same time. At >>>> some point we may have the first branch that is almost complete, while the >>>> second is still at an early stage. In this case, does Tez knows that "soon >>>> or later" the two branches will merge, thus there will be a common >>>> consumer waiting for the slower branch to complete? Actually the real >>>> question is: does Tez prioritize the scheduling/resource allocation of >>>> tasks belonging to slower branches? If yes, what kind of policy is >>>> adopted? Is it configurable? >>>> >>>> [Bikas] Currently the priority of a vertex is the distance from the source >>>> of the DAG. So vertices can run in parallel. On the roadmap are items like >>>> critical path scheduling where the vertex that is holding up the job the >>>> most or that’s going to unblock the most amount of downstream work to be >>>> given higher priority. >>>> >>>> 3) tez.am.shuffle-vertex-manager.min-src-fraction: if I have a dag made of >>>> two producer vertexes, each one running 10 tasks, and below them a >>>> consumer vertex, let's say running 5 tasks, so if this property is set to >>>> 0.2, does it mean that before running any consumer task we need 2 producer >>>> tasks to complete for each of the producer vertexes? Or are they >>>> considered as a whole and we need just 4 tasks completed (even just from >>>> one vertex)? >>>> >>>> [Bikas] It currently looks at the fraction of the whole (both combined) >>>> but we are going to change it to look at the fraction per source vertex. >>>> Again, this is just a hint. (With auto-parallelism on) the vertex also >>>> looks at whether enough data has been produced before triggering the tasks >>>> because the real intention is to have enough data available for the reduce >>>> to pull so that it can overlap the pull with the completion of the map >>>> tasks. >>>> >>>> 4) As far as I understand, a single Tez Application Master can handle >>>> multiple DAGs at the same time, but only if the user-application has been >>>> coded to do so (for example, if I run two wordcount with the same user, it >>>> simply creates two different Tez App Master). Is this correct? >>>> >>>> [Bikas] If the TezClient is started in session mode then it re-uses the >>>> App Master for multiple DAGs. The code is the same in session and >>>> non-session mode. The behavior can be changed via configuration (or hard >>>> coded in the code if desired). So you can use both modes with the same >>>> code. To be clear, the AppMaster does not run dags concurrently. It runs >>>> one DAG at a time. >>>> >>>> Thanks in advance >>>> >>>> Fabio >>>> >>>> CONFIDENTIALITY NOTICE >>>> NOTICE: This message is intended for the use of the individual or entity >>>> to which it is addressed and may contain information that is confidential, >>>> privileged and exempt from disclosure under applicable law. If the reader >>>> of this message is not the intended recipient, you are hereby notified >>>> that any printing, copying, dissemination, distribution, disclosure or >>>> forwarding of this communication is strictly prohibited. If you have >>>> received this communication in error, please contact the sender >>>> immediately and delete it from your system. Thank You. >>> >> >
