Hadoop Internal Architecture writeup (failure handling)

2008-12-01 Thread Ricky Ho
More questions on component failure handling.  Can anyone confirm (or correct) 
that ?

1) When a TaskTracker crashes, the JobTracker haven't heard its heartbeat after 
a timeout period will conclude its crashes and re-allocate the unfinished task 
to other tasktrackers.  Correct ?

2) If the original TaskTracker is just overloaded rather than crash, what if it 
submit the Task result afterwards ?

3) When a DataNode crashes, the NameNode haven't heard its heartbeat after a 
timeout period will conclude its crashes.  The NameNode will gradually 
redistribute the data chunks of the failed DataNode to other DataNodes to 
comply with the replication factors.  Correct ?

4) Now what if the crashed DataNode reboot after an hour and rejoin the 
cluster.  It will report to the NameNode which chunks it has.  How does the 
NameNode recognize which chunks are outdated ?  Using some kind of version 
number ?

5) After NameNode detects certain chunks are outdated, does NameNode simply 
discard them (and garbage collect them later) or does it try to bring them up 
to date (e.g. send it all the deltas) ?

6) When a NameNode crashes, all HDFS write cannot proceed.  But HDFS reads can 
proceed if the client has a chunk handle to the DataNode.  Correct ?  This 
situation will continue until the NameNode is recovered and things will be back 
to normal.  Correct ?

7) When a JobTracker crashes, all the Job that hasn't been completed will all 
be discarded.  When the JobTracker restarted, the client need to resubmit all 
the jobs again.  Correct ?

Rgds,
Ricky

-Original Message-
From: Amar Kamat [mailto:[EMAIL PROTECTED]
Sent: Sunday, November 30, 2008 8:02 PM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Hey, nice work and nice writeup. Keep it up.
Comments inline.
Amar


-Original Message-
From: Ricky Ho [mailto:[EMAIL PROTECTED]
Sent: Fri 11/28/2008 9:45 AM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for 
...  I have some more question ...

==
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==
[Ricky]  What exactly does the job.split contains ?  I assume it contains the 
specification for each split (but not its data), such as what is the 
corresponding file and the byte range within that file.  Correct ?


This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=
[Ricky]  I am curious about why can't the reduce execution start earlier 
(before all the map tasks completed).  The value iterator inside the 
used-defined reduce() method can be blocked to wait for more map tasks 
completion.  In other words, the map() and reduce() can also be proceeding in a 
pipeline parallelism.


==
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
==
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 
TaskTrackers VM (one for each split) ?

comment:
If the job has 5000 splits, then it requires 5000 VMs (one for each split). 
TaskTracker is a framework daemon. TaskTracker is a process (JVM) that 
handles/manages tasks (processes processing a split) on a node. A TaskTracker 
is recognized by (node-hostname + port). A task is never executed in a 
TaskTracker and new jvm is spawned. The reason being that a faulty 
usercode(map/reduce) should not bring down a TaskTracker (a framework process). 
But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require  
5000 VMs. Note that tasks in the end might get speculated which might add to 
the VM count.
Amar


===
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory

[Ricky]  Is this a well-know folder within the HDFS ?

This is set using mapred.system.dir during cluster startup (see 
hadoop-default.conf). Its a framework directory.




RE: Hadoop Internal Architecture writeup

2008-11-30 Thread Amar Kamat
Hey, nice work and nice writeup. Keep it up.
Comments inline.
Amar


-Original Message-
From: Ricky Ho [mailto:[EMAIL PROTECTED]
Sent: Fri 11/28/2008 9:45 AM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup
 
Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for 
...  I have some more question ...

==
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==
[Ricky]  What exactly does the job.split contains ?  I assume it contains the 
specification for each split (but not its data), such as what is the 
corresponding file and the byte range within that file.  Correct ?


This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=
[Ricky]  I am curious about why can't the reduce execution start earlier 
(before all the map tasks completed).  The value iterator inside the 
used-defined reduce() method can be blocked to wait for more map tasks 
completion.  In other words, the map() and reduce() can also be proceeding in a 
pipeline parallelism.


==
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
==
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 
TaskTrackers VM (one for each split) ?

comment:
If the job has 5000 splits, then it requires 5000 VMs (one for each split). 
TaskTracker is a framework daemon. TaskTracker is a process (JVM) that 
handles/manages tasks (processes processing a split) on a node. A TaskTracker 
is recognized by (node-hostname + port). A task is never executed in a 
TaskTracker and new jvm is spawned. The reason being that a faulty 
usercode(map/reduce) should not bring down a TaskTracker (a framework process). 
But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require  
5000 VMs. Note that tasks in the end might get speculated which might add to 
the VM count.
Amar


===
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory

[Ricky]  Is this a well-know folder within the HDFS ?

This is set using mapred.system.dir during cluster startup (see 
hadoop-default.conf). Its a framework directory.




RE: Hadoop Internal Architecture writeup

2008-11-28 Thread Ricky Ho
Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for 
...  I have some more question ...

==
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==
[Ricky]  What exactly does the job.split contains ?  I assume it contains the 
specification for each split (but not its data), such as what is the 
corresponding file and the byte range within that file.  Correct ?


This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=
[Ricky]  I am curious about why can't the reduce execution start earlier 
(before all the map tasks completed).  The value iterator inside the 
used-defined reduce() method can be blocked to wait for more map tasks 
completion.  In other words, the map() and reduce() can also be proceeding in a 
pipeline parallelism.


==
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
==
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 
TaskTrackers VM (one for each split) ?

===
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory

[Ricky]  Is this a well-know folder within the HDFS ?



Re: Hadoop Internal Architecture writeup

2008-11-28 Thread Owen O'Malley


On Nov 28, 2008, at 9:45 AM, Ricky Ho wrote:

[Ricky]  What exactly does the job.split contains ?  I assume it  
contains the specification for each split (but not its data), such  
as what is the corresponding file and the byte range within that  
file.  Correct ?


Yes

[Ricky]  I am curious about why can't the reduce execution start  
earlier (before all the map tasks completed).


The contract is that each reduce is given the keys in sorted order.  
The reduce can't start until it is sure it has the first key. That can  
only happen after the maps are all finished.


[Ricky]  Do you mean if the job has 5000 splits, then it requires  
5000 TaskTrackers VM (one for each split) ?


In 0.19, you can enable the framework to re-use jvms between tasks in  
the same job. Look at HADOOP-249.



[Ricky]  Is this a well-know folder within the HDFS ?


It is configured by the cluster admin. However, applications should  
*not* depend on the contents or even visibility of that directory. It  
will almost certainly become inaccessible to clients as part of  
increasing security.


-- Owen


RE: Hadoop Internal Architecture writeup

2008-11-28 Thread Ricky Ho
Does this mean within a TaskTracker, all reduce() method (even with different 
key) has to be called sequentially within a single thread ?  If so, this 
contract also restrict the use of multiple threads (ie: a different thread for 
each key).

Isn't it true that in most case, the application doesn't care the order of keys 
?  Because if the application requires certain keys to arrive at the same node, 
then it will be difficult to improve parallelism by just adding more reducer 
nodes.

Or is this contract a configurable option ?

May be I am missing the rationale behind this contract ?  Can you shed some 
light ?

Rgds, Ricky

 [Ricky]  I am curious about why can't the reduce execution start
 earlier (before all the map tasks completed).

The contract is that each reduce is given the keys in sorted order.
The reduce can't start until it is sure it has the first key. That can
only happen after the maps are all finished.




Hadoop Internal Architecture writeup

2008-11-27 Thread Ricky Ho
I put together an article describing the internal architecture of Hadoop (HDFS, 
MapRed).  I'd love to get some feedback if you see anything inaccurate or 
missing ...
http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html

Rgds,
Ricky


Re: Hadoop Internal Architecture writeup

2008-11-27 Thread Sharad Agarwal
Just glanced into it. Haven't read in detail. One correction about 
Secondary namenode - It is not a Hot Standby.

see http://wiki.apache.org/hadoop/FAQ#7

Ricky Ho wrote:

I put together an article describing the internal architecture of Hadoop (HDFS, 
MapRed).  I'd love to get some feedback if you see anything inaccurate or 
missing ...
http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html

Rgds,
Ricky
  




Re: Hadoop Internal Architecture writeup

2008-11-27 Thread Amar Kamat

Ricky Ho wrote:

I put together an article describing the internal architecture of Hadoop (HDFS, 
MapRed).  I'd love to get some feedback if you see anything inaccurate or 
missing ...
http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html
  

Few comments on MR :
1) The JobTracker will first determine the number of splits (each split 
is configurable, ~16-64MB) from the input path, and select some 
TaskTracker based on their network proximity to the data sources, then 
the JobTracker send the task requests to those selected TaskTrackers.
The jobclient while submitting the job calculates the split using 
InputFormat which is specified by the user. Internally the InputFormat 
might make use of dfs-block size, user-hinted num-maps etc. The 
jobtracker is given 3 files

- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
Hence the JobTracker never actually *calculates* the split.
Also the task-tracker asks for the task. Its a pull method where the 
tracker ask and the jobtracker schedules based on (node/rack/switch) 
locality. JobTracker never actually initiates scheduling.


2) For each record parsed by the “InputFormat”
To be more precise RecordReader included in the InputFormat.

3) A periodic wakeup process will sort the memory buffer into different 
reducer node by invoke the “combine” function.
Need to cross check if its a wakeup process or a on-demand thread that 
is spawned once the buffer is nearly full. Btw the function that 
determined which key-val goes to which reducer is called Partitioner. 
Combiner is just an optimization that does a local 
merge/reduction/aggregation of the data before sending it over the network.


4) When all the TaskTrackers are done, the JobTracker will notify the 
selected TaskTrackers for the reduce phase.
This process is interleaved/parallelized. As soon as a map is done, the 
JobTracker is notified. Once a tracker (with a reducer) asks for events, 
these new events are passed. Hence the map output pulling (Shuffle 
Phase) works in parallel with the Map Phase. Reduce Phase can start only 
once all the (resp) map outputs are copied and merged.


5) The JobTracker keep tracks of the progress of each phases and 
periodically ping the TaskTracker for their health status.
Its again a push rather than a pull. Trackers report their status 
instead of JobTracker asking them.


6) When any of the map phase TaskTracker crashes, the JobTracker will 
reassign the map task to a different TaskTracker node, which will rerun 
all the assigned splits.
There is a 1-1 mapping between a split and a map task. Hence it will 
re-run the map on the corresponding split.


7) After both phase completes, the JobTracker will unblock the client 
program.
The client is unblocked once the job is submitted. The way it works is 
as follows :

- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists 
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location 
called System-Directory
- jobclient informs the jobtracker that the files are ready and the 
jobtracker returns the control.


Its not necessary that the jobclient always gets blocked on job submission.

Amar

Rgds,
Ricky
  




Re: Hadoop Internal Architecture writeup

2008-11-27 Thread tim robertson
Hi Ricky,

As a newcomer to MR and Hadoop I think what you are doing is a great
addition to the docs.  One thing I would like to see in this overview
is how JVM's are spawned in the process - e.g. is it 1 JVM per node
per job, or per node per task etc.  The reason being it has
implications about setting up JVM wide in-memory caches for the Maps /
Reducers to utilise (E.g. a big lookup hashtable which the Map will
use).  I assume it is a JVM per task per node to allow the task
tracker to kill the process and restrict memory to a task, but I am no
sure.

Thanks again

Tim

On Fri, Nov 28, 2008 at 7:59 AM, Amar Kamat [EMAIL PROTECTED] wrote:
 Ricky Ho wrote:

 I put together an article describing the internal architecture of Hadoop
 (HDFS, MapRed).  I'd love to get some feedback if you see anything
 inaccurate or missing ...
 http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html


 Few comments on MR :
 1) The JobTracker will first determine the number of splits (each split is
 configurable, ~16-64MB) from the input path, and select some TaskTracker
 based on their network proximity to the data sources, then the JobTracker
 send the task requests to those selected TaskTrackers.
 The jobclient while submitting the job calculates the split using
 InputFormat which is specified by the user. Internally the InputFormat might
 make use of dfs-block size, user-hinted num-maps etc. The jobtracker is
 given 3 files
 - job.xml : job control parameters
 - job.split : the split file
 - job.jar : user map-reduce code
 Hence the JobTracker never actually *calculates* the split.
 Also the task-tracker asks for the task. Its a pull method where the tracker
 ask and the jobtracker schedules based on (node/rack/switch) locality.
 JobTracker never actually initiates scheduling.

 2) For each record parsed by the InputFormat
 To be more precise RecordReader included in the InputFormat.

 3) A periodic wakeup process will sort the memory buffer into different
 reducer node by invoke the combine function.
 Need to cross check if its a wakeup process or a on-demand thread that is
 spawned once the buffer is nearly full. Btw the function that determined
 which key-val goes to which reducer is called Partitioner. Combiner is
 just an optimization that does a local merge/reduction/aggregation of the
 data before sending it over the network.

 4) When all the TaskTrackers are done, the JobTracker will notify the
 selected TaskTrackers for the reduce phase.
 This process is interleaved/parallelized. As soon as a map is done, the
 JobTracker is notified. Once a tracker (with a reducer) asks for events,
 these new events are passed. Hence the map output pulling (Shuffle Phase)
 works in parallel with the Map Phase. Reduce Phase can start only once all
 the (resp) map outputs are copied and merged.

 5) The JobTracker keep tracks of the progress of each phases and
 periodically ping the TaskTracker for their health status.
 Its again a push rather than a pull. Trackers report their status instead of
 JobTracker asking them.

 6) When any of the map phase TaskTracker crashes, the JobTracker will
 reassign the map task to a different TaskTracker node, which will rerun all
 the assigned splits.
 There is a 1-1 mapping between a split and a map task. Hence it will re-run
 the map on the corresponding split.

 7) After both phase completes, the JobTracker will unblock the client
 program.
 The client is unblocked once the job is submitted. The way it works is as
 follows :
 - jobclient requests the jobtracker for a unique job id
 - jobclient does some sanity checks to see if the output folder exists etc
 ...
 - jobclient uploads job files (xml, jar, split) onto a known location called
 System-Directory
 - jobclient informs the jobtracker that the files are ready and the
 jobtracker returns the control.

 Its not necessary that the jobclient always gets blocked on job submission.

 Amar

 Rgds,
 Ricky