Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "NativeMapReduce" page has been changed by Aniket Mokashi.
http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=6&rev2=7

--------------------------------------------------

  == Introduction ==
  Pig needs to provide a way to natively run map reduce jobs written in java 
language.
  There are some advantages of this-
-  1. The advantages of the ''native'' keyword are that the user need not be 
worried about coordination between the jobs, pig will take care of it.
+  1. The advantages of the ''mapreduce'' keyword are that the user need not be 
worried about coordination between the jobs, pig will take care of it.
   2. User can make use of existing java applications without being a java 
programmer.
  
  == Syntax ==
  To support native mapreduce job pig will support following syntax-
  {{{
  X = ... ;
- Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
+ Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
  }}}
  
  This stores '''X''' into the '''storeLocation''' using '''storeFunc''', which 
is then used by native mapreduce to read its data. After we run mymr.jar's 
mapreduce, we load back the data from '''loadLocation''' into alias '''Y''' 
using '''loadFunc'''.
  
- params are extra parameters required for native mapreduce job (TBD).
+ params are extra parameters required for native mapreduce job.
  
- mymr.jar is complaint with pig specification (see below).
+ '''mymr.jar is any mapreduce jar file which can be run through "hadoop -jar 
mymr.jar params" command.'''
  
  == Comparison with similar features ==
  === Pig Streaming ===
@@ -38, +38 @@

  
  With native job support, pig can support native map reduce jobs written in 
java language that can convert a data set into a different data set after 
applying a custom map reduce functions of any complexity.
  
- == Native Mapreduce job specification ==
- Native Mapreduce job needs to conform to some specification defined by Pig. 
This is required because Pig specifies the input and output directory in the 
script for this job and is responsible for managing the coordination of the 
native job with the remaining pig mapreduce jobs. Pig also might need to 
provide some extra configuration like job name, input/output formats, 
parallelism to the native job. For communicating such parameters to the native 
job, it should be according to specification provided by Pig.
- 
- Following are some of the approaches of achieving this-
-  1. '''Ordered inputLoc/outputLoc parameters'''- This is simplistic approach 
wherein native programs follow up a convention so that their first and second 
parameters are treated as input and output respectively. Pig ''native'' command 
takes the parameters required by the native mapreduce job and passes it to 
native job as command line arguments. It is upto the native program to use 
these parameters for operations it performs.
- Thus, only following lines of code are mandatory inside the native program.
- {{{
- FileInputFormat.setInputPaths(conf, new Path(args[0]));  
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- }}}
-  1.#2 '''getJobConf Function'''- Native jobs implement '''getJobConf''' 
method which returns ''org.apache.hadoop.mapred.JobConf'' object so that pig 
can construct a ''job'' and schedule that inside pigs ''jobcontrol'' job. This 
also provides a way to add more pig specific parameters to this job before it 
is submitted. Most of the current native hadoop program create JobConf's and 
run hadoop jobs with ''JobClient.runJob(conf)''. These applications need to 
change their code to a getJobConf function so that pig can hook into them to 
get the conf. This will also allow pig to set the input and output directory 
for the native job.
- For example-
- {{{
- public JobConf getJobConf() {
-     JobConf conf = new JobConf(WordCount.class);
-     conf.setJobName("wordcount");
-     
-     conf.setOutputKeyClass(Text.class);
-     conf.setOutputValueClass(IntWritable.class);
- 
-     conf.setMapperClass(Map.class);
-     conf.setCombinerClass(Reduce.class);
-     conf.setReducerClass(Reduce.class);
-     
-     conf.setInputFormat(TextInputFormat.class);
-     conf.setOutputFormat(TextOutputFormat.class);
-     
-     FileInputFormat.setInputPaths(conf, new Path(args[0]));
-     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- }
- public static void main(String[] args) throws Exception { 
-     JobClient.runJob(getJobConf());
- }
- }}}
  == Implementation Details ==
  {{{
  X = ... ;
- Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
+ Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
  }}}
  Logical Plan- Logical Plan creates a LONative operator with an internal plan 
that consists of a store and a load operator. The store operator cannot be 
attached to X at this level as it would start storing X at storeLocation for 
every plan that includes X which is not intended. Although we can LOLoad 
operator for Y at this point, we delay this to physical plan and track this 
with LONative operator. Also, since Y has dependency on X, we add plan of Y 
whenever we see plan for X in ''registerQuery''.
  

Reply via email to