Hadoop Internal Architecture writeup (failure handling)
More questions on component failure handling. Can anyone confirm (or correct) that ? 1) When a TaskTracker crashes, the JobTracker haven't heard its heartbeat after a timeout period will conclude its crashes and re-allocate the unfinished task to other tasktrackers. Correct ? 2) If the original TaskTracker is just overloaded rather than crash, what if it submit the Task result afterwards ? 3) When a DataNode crashes, the NameNode haven't heard its heartbeat after a timeout period will conclude its crashes. The NameNode will gradually redistribute the data chunks of the failed DataNode to other DataNodes to comply with the replication factors. Correct ? 4) Now what if the crashed DataNode reboot after an hour and rejoin the cluster. It will report to the NameNode which chunks it has. How does the NameNode recognize which chunks are outdated ? Using some kind of version number ? 5) After NameNode detects certain chunks are outdated, does NameNode simply discard them (and garbage collect them later) or does it try to bring them up to date (e.g. send it all the deltas) ? 6) When a NameNode crashes, all HDFS write cannot proceed. But HDFS reads can proceed if the client has a chunk handle to the DataNode. Correct ? This situation will continue until the NameNode is recovered and things will be back to normal. Correct ? 7) When a JobTracker crashes, all the Job that hasn't been completed will all be discarded. When the JobTracker restarted, the client need to resubmit all the jobs again. Correct ? Rgds, Ricky -Original Message- From: Amar Kamat [mailto:[EMAIL PROTECTED] Sent: Sunday, November 30, 2008 8:02 PM To: core-user@hadoop.apache.org Subject: RE: Hadoop Internal Architecture writeup Hey, nice work and nice writeup. Keep it up. Comments inline. Amar -Original Message- From: Ricky Ho [mailto:[EMAIL PROTECTED] Sent: Fri 11/28/2008 9:45 AM To: core-user@hadoop.apache.org Subject: RE: Hadoop Internal Architecture writeup Amar, thanks a lot. This is exactly the kind of feedback that I am looking for ... I have some more question ... == The jobclient while submitting the job calculates the split using InputFormat which is specified by the user. Internally the InputFormat might make use of dfs-block size, user-hinted num-maps etc. The jobtracker is given 3 files - job.xml : job control parameters - job.split : the split file - job.jar : user map-reduce code == [Ricky] What exactly does the job.split contains ? I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file. Correct ? This process is interleaved/parallelized. As soon as a map is done, the JobTracker is notified. Once a tracker (with a reducer) asks for events, these new events are passed. Hence the map output pulling (Shuffle Phase) works in parallel with the Map Phase. Reduce Phase can start only once all the (resp) map outputs are copied and merged. = [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The value iterator inside the used-defined reduce() method can be blocked to wait for more map tasks completion. In other words, the map() and reduce() can also be proceeding in a pipeline parallelism. == There is a 1-1 mapping between a split and a map task. Hence it will re-run the map on the corresponding split. == [Ricky] Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ? comment: If the job has 5000 splits, then it requires 5000 VMs (one for each split). TaskTracker is a framework daemon. TaskTracker is a process (JVM) that handles/manages tasks (processes processing a split) on a node. A TaskTracker is recognized by (node-hostname + port). A task is never executed in a TaskTracker and new jvm is spawned. The reason being that a faulty usercode(map/reduce) should not bring down a TaskTracker (a framework process). But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require 5000 VMs. Note that tasks in the end might get speculated which might add to the VM count. Amar === The client is unblocked once the job is submitted. The way it works is as follows : - jobclient requests the jobtracker for a unique job id - jobclient does some sanity checks to see if the output folder exists etc ... - jobclient uploads job files (xml, jar, split) onto a known location called System-Directory [Ricky] Is this a well-know folder within the HDFS ? This is set using mapred.system.dir during cluster startup (see hadoop-default.conf). Its a framework directory.
RE: Hadoop Internal Architecture writeup
Hey, nice work and nice writeup. Keep it up. Comments inline. Amar -Original Message- From: Ricky Ho [mailto:[EMAIL PROTECTED] Sent: Fri 11/28/2008 9:45 AM To: core-user@hadoop.apache.org Subject: RE: Hadoop Internal Architecture writeup Amar, thanks a lot. This is exactly the kind of feedback that I am looking for ... I have some more question ... == The jobclient while submitting the job calculates the split using InputFormat which is specified by the user. Internally the InputFormat might make use of dfs-block size, user-hinted num-maps etc. The jobtracker is given 3 files - job.xml : job control parameters - job.split : the split file - job.jar : user map-reduce code == [Ricky] What exactly does the job.split contains ? I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file. Correct ? This process is interleaved/parallelized. As soon as a map is done, the JobTracker is notified. Once a tracker (with a reducer) asks for events, these new events are passed. Hence the map output pulling (Shuffle Phase) works in parallel with the Map Phase. Reduce Phase can start only once all the (resp) map outputs are copied and merged. = [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The value iterator inside the used-defined reduce() method can be blocked to wait for more map tasks completion. In other words, the map() and reduce() can also be proceeding in a pipeline parallelism. == There is a 1-1 mapping between a split and a map task. Hence it will re-run the map on the corresponding split. == [Ricky] Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ? comment: If the job has 5000 splits, then it requires 5000 VMs (one for each split). TaskTracker is a framework daemon. TaskTracker is a process (JVM) that handles/manages tasks (processes processing a split) on a node. A TaskTracker is recognized by (node-hostname + port). A task is never executed in a TaskTracker and new jvm is spawned. The reason being that a faulty usercode(map/reduce) should not bring down a TaskTracker (a framework process). But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require 5000 VMs. Note that tasks in the end might get speculated which might add to the VM count. Amar === The client is unblocked once the job is submitted. The way it works is as follows : - jobclient requests the jobtracker for a unique job id - jobclient does some sanity checks to see if the output folder exists etc ... - jobclient uploads job files (xml, jar, split) onto a known location called System-Directory [Ricky] Is this a well-know folder within the HDFS ? This is set using mapred.system.dir during cluster startup (see hadoop-default.conf). Its a framework directory.
RE: Hadoop Internal Architecture writeup
Amar, thanks a lot. This is exactly the kind of feedback that I am looking for ... I have some more question ... == The jobclient while submitting the job calculates the split using InputFormat which is specified by the user. Internally the InputFormat might make use of dfs-block size, user-hinted num-maps etc. The jobtracker is given 3 files - job.xml : job control parameters - job.split : the split file - job.jar : user map-reduce code == [Ricky] What exactly does the job.split contains ? I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file. Correct ? This process is interleaved/parallelized. As soon as a map is done, the JobTracker is notified. Once a tracker (with a reducer) asks for events, these new events are passed. Hence the map output pulling (Shuffle Phase) works in parallel with the Map Phase. Reduce Phase can start only once all the (resp) map outputs are copied and merged. = [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The value iterator inside the used-defined reduce() method can be blocked to wait for more map tasks completion. In other words, the map() and reduce() can also be proceeding in a pipeline parallelism. == There is a 1-1 mapping between a split and a map task. Hence it will re-run the map on the corresponding split. == [Ricky] Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ? === The client is unblocked once the job is submitted. The way it works is as follows : - jobclient requests the jobtracker for a unique job id - jobclient does some sanity checks to see if the output folder exists etc ... - jobclient uploads job files (xml, jar, split) onto a known location called System-Directory [Ricky] Is this a well-know folder within the HDFS ?
Re: Hadoop Internal Architecture writeup
On Nov 28, 2008, at 9:45 AM, Ricky Ho wrote: [Ricky] What exactly does the job.split contains ? I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file. Correct ? Yes [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The contract is that each reduce is given the keys in sorted order. The reduce can't start until it is sure it has the first key. That can only happen after the maps are all finished. [Ricky] Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ? In 0.19, you can enable the framework to re-use jvms between tasks in the same job. Look at HADOOP-249. [Ricky] Is this a well-know folder within the HDFS ? It is configured by the cluster admin. However, applications should *not* depend on the contents or even visibility of that directory. It will almost certainly become inaccessible to clients as part of increasing security. -- Owen
RE: Hadoop Internal Architecture writeup
Does this mean within a TaskTracker, all reduce() method (even with different key) has to be called sequentially within a single thread ? If so, this contract also restrict the use of multiple threads (ie: a different thread for each key). Isn't it true that in most case, the application doesn't care the order of keys ? Because if the application requires certain keys to arrive at the same node, then it will be difficult to improve parallelism by just adding more reducer nodes. Or is this contract a configurable option ? May be I am missing the rationale behind this contract ? Can you shed some light ? Rgds, Ricky [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The contract is that each reduce is given the keys in sorted order. The reduce can't start until it is sure it has the first key. That can only happen after the maps are all finished.
Hadoop Internal Architecture writeup
I put together an article describing the internal architecture of Hadoop (HDFS, MapRed). I'd love to get some feedback if you see anything inaccurate or missing ... http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html Rgds, Ricky
Re: Hadoop Internal Architecture writeup
Just glanced into it. Haven't read in detail. One correction about Secondary namenode - It is not a Hot Standby. see http://wiki.apache.org/hadoop/FAQ#7 Ricky Ho wrote: I put together an article describing the internal architecture of Hadoop (HDFS, MapRed). I'd love to get some feedback if you see anything inaccurate or missing ... http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html Rgds, Ricky
Re: Hadoop Internal Architecture writeup
Ricky Ho wrote: I put together an article describing the internal architecture of Hadoop (HDFS, MapRed). I'd love to get some feedback if you see anything inaccurate or missing ... http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html Few comments on MR : 1) The JobTracker will first determine the number of splits (each split is configurable, ~16-64MB) from the input path, and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers. The jobclient while submitting the job calculates the split using InputFormat which is specified by the user. Internally the InputFormat might make use of dfs-block size, user-hinted num-maps etc. The jobtracker is given 3 files - job.xml : job control parameters - job.split : the split file - job.jar : user map-reduce code Hence the JobTracker never actually *calculates* the split. Also the task-tracker asks for the task. Its a pull method where the tracker ask and the jobtracker schedules based on (node/rack/switch) locality. JobTracker never actually initiates scheduling. 2) For each record parsed by the “InputFormat” To be more precise RecordReader included in the InputFormat. 3) A periodic wakeup process will sort the memory buffer into different reducer node by invoke the “combine” function. Need to cross check if its a wakeup process or a on-demand thread that is spawned once the buffer is nearly full. Btw the function that determined which key-val goes to which reducer is called Partitioner. Combiner is just an optimization that does a local merge/reduction/aggregation of the data before sending it over the network. 4) When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase. This process is interleaved/parallelized. As soon as a map is done, the JobTracker is notified. Once a tracker (with a reducer) asks for events, these new events are passed. Hence the map output pulling (Shuffle Phase) works in parallel with the Map Phase. Reduce Phase can start only once all the (resp) map outputs are copied and merged. 5) The JobTracker keep tracks of the progress of each phases and periodically ping the TaskTracker for their health status. Its again a push rather than a pull. Trackers report their status instead of JobTracker asking them. 6) When any of the map phase TaskTracker crashes, the JobTracker will reassign the map task to a different TaskTracker node, which will rerun all the assigned splits. There is a 1-1 mapping between a split and a map task. Hence it will re-run the map on the corresponding split. 7) After both phase completes, the JobTracker will unblock the client program. The client is unblocked once the job is submitted. The way it works is as follows : - jobclient requests the jobtracker for a unique job id - jobclient does some sanity checks to see if the output folder exists etc ... - jobclient uploads job files (xml, jar, split) onto a known location called System-Directory - jobclient informs the jobtracker that the files are ready and the jobtracker returns the control. Its not necessary that the jobclient always gets blocked on job submission. Amar Rgds, Ricky
Re: Hadoop Internal Architecture writeup
Hi Ricky, As a newcomer to MR and Hadoop I think what you are doing is a great addition to the docs. One thing I would like to see in this overview is how JVM's are spawned in the process - e.g. is it 1 JVM per node per job, or per node per task etc. The reason being it has implications about setting up JVM wide in-memory caches for the Maps / Reducers to utilise (E.g. a big lookup hashtable which the Map will use). I assume it is a JVM per task per node to allow the task tracker to kill the process and restrict memory to a task, but I am no sure. Thanks again Tim On Fri, Nov 28, 2008 at 7:59 AM, Amar Kamat [EMAIL PROTECTED] wrote: Ricky Ho wrote: I put together an article describing the internal architecture of Hadoop (HDFS, MapRed). I'd love to get some feedback if you see anything inaccurate or missing ... http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html Few comments on MR : 1) The JobTracker will first determine the number of splits (each split is configurable, ~16-64MB) from the input path, and select some TaskTracker based on their network proximity to the data sources, then the JobTracker send the task requests to those selected TaskTrackers. The jobclient while submitting the job calculates the split using InputFormat which is specified by the user. Internally the InputFormat might make use of dfs-block size, user-hinted num-maps etc. The jobtracker is given 3 files - job.xml : job control parameters - job.split : the split file - job.jar : user map-reduce code Hence the JobTracker never actually *calculates* the split. Also the task-tracker asks for the task. Its a pull method where the tracker ask and the jobtracker schedules based on (node/rack/switch) locality. JobTracker never actually initiates scheduling. 2) For each record parsed by the InputFormat To be more precise RecordReader included in the InputFormat. 3) A periodic wakeup process will sort the memory buffer into different reducer node by invoke the combine function. Need to cross check if its a wakeup process or a on-demand thread that is spawned once the buffer is nearly full. Btw the function that determined which key-val goes to which reducer is called Partitioner. Combiner is just an optimization that does a local merge/reduction/aggregation of the data before sending it over the network. 4) When all the TaskTrackers are done, the JobTracker will notify the selected TaskTrackers for the reduce phase. This process is interleaved/parallelized. As soon as a map is done, the JobTracker is notified. Once a tracker (with a reducer) asks for events, these new events are passed. Hence the map output pulling (Shuffle Phase) works in parallel with the Map Phase. Reduce Phase can start only once all the (resp) map outputs are copied and merged. 5) The JobTracker keep tracks of the progress of each phases and periodically ping the TaskTracker for their health status. Its again a push rather than a pull. Trackers report their status instead of JobTracker asking them. 6) When any of the map phase TaskTracker crashes, the JobTracker will reassign the map task to a different TaskTracker node, which will rerun all the assigned splits. There is a 1-1 mapping between a split and a map task. Hence it will re-run the map on the corresponding split. 7) After both phase completes, the JobTracker will unblock the client program. The client is unblocked once the job is submitted. The way it works is as follows : - jobclient requests the jobtracker for a unique job id - jobclient does some sanity checks to see if the output folder exists etc ... - jobclient uploads job files (xml, jar, split) onto a known location called System-Directory - jobclient informs the jobtracker that the files are ready and the jobtracker returns the control. Its not necessary that the jobclient always gets blocked on job submission. Amar Rgds, Ricky