Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "NativeMapReduce" page has been changed by Aniket Mokashi.
http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=5&rev2=6

--------------------------------------------------

- = Page under construction =
- 
  #format wiki
  #language en
  
@@ -18, +16 @@

  
  == Syntax ==
  To support native mapreduce job pig will support following syntax-
- 
  {{{
  X = ... ;
  Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
@@ -35, +32 @@

  Purpose of [[#ref2|pig streaming]] is to send data through an external script 
or program to transform a dataset into a different dataset based on a custom 
script written in any programming/scripting language. Pig streaming uses 
support of hadoop streaming to achieve this. Pig can register custom programs 
in a script, inline in the stream clause or using a define clause. Pig also 
provides a level of data guarantees on the data processing, provides feature 
for job management, provides ability to use distributed cache for the scripts 
(configurable). Streaming application run locally on individual mapper and 
reducer nodes for transforming the data.
  
  === Hive Transforms ===
- With [[#ref3|hive transforms]], users can also plug in their own custom 
mappers and reducers in the data stream. Basically, it is also an application 
of custom streaming supported by hadoop. Thus, these mappers and reducers can 
be written in any scripting languages and can be registered to distributed 
cache to help performance. To support custom map reduce programs written in 
java ([[#ref4|bezo's blog]]), we can use our custom mappers and reducers as 
data streaming functions and use them to transform the data using 'java -cp 
mymr.jar'. This will not invoke a map reduce task but will attempt to transform 
the data during the map or the reduce task (locally).
+ With [[#ref3|hive transforms]], users can also plug in their own custom 
mappers and reducers in the data stream. Basically, it is also an application 
of custom streaming supported by hadoop. Thus, these mappers and reducers can 
be written in any scripting languages and can be registered to distributed 
cache to help performance. To support custom map reduce programs written in 
java ([[#ref4|bizo's blog]]), we can use our custom mappers and reducers as 
data streaming functions and use them to transform the data using 'java -cp 
mymr.jar'. This will not invoke a map reduce task but will attempt to transform 
the data during the map or the reduce task (locally).
  
  Thus, both these features can transform data submitted to a map reduce job 
(mapper) into a different data set and/or transform data produced by a 
mapreduce job (reducer) into a different data set. But we should notice that 
data tranformation takes on a single machine and does not take advantage of map 
reduce framework itself. Also, these blocks only allow custom transformations 
inside the data pipeline and does not break the pipeline.
  
@@ -45, +42 @@

  Native Mapreduce job needs to conform to some specification defined by Pig. 
This is required because Pig specifies the input and output directory in the 
script for this job and is responsible for managing the coordination of the 
native job with the remaining pig mapreduce jobs. Pig also might need to 
provide some extra configuration like job name, input/output formats, 
parallelism to the native job. For communicating such parameters to the native 
job, it should be according to specification provided by Pig.
  
  Following are some of the approaches of achieving this-
-  1. Ordered inputLoc/outputLoc parameters- This is simplistic approach 
wherein native programs follow up a convention so that their first and second 
parameters are treated as input and output respectively. Pig ''native'' command 
takes the parameters required by the native mapreduce job and passes it to 
native job as command line arguments. It is upto the native program to use 
these parameters for operations it performs.
+  1. '''Ordered inputLoc/outputLoc parameters'''- This is simplistic approach 
wherein native programs follow up a convention so that their first and second 
parameters are treated as input and output respectively. Pig ''native'' command 
takes the parameters required by the native mapreduce job and passes it to 
native job as command line arguments. It is upto the native program to use 
these parameters for operations it performs.
  Thus, only following lines of code are mandatory inside the native program.
  {{{
  FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  }}}
-  2. getJobConf Function- Native jobs implement '''getJobConf''' method which 
returns org.apache.hadoop.mapred.JobConf object so that pig can schedule the 
job. This also provides a way to add more pig specific parame
+  1.#2 '''getJobConf Function'''- Native jobs implement '''getJobConf''' 
method which returns ''org.apache.hadoop.mapred.JobConf'' object so that pig 
can construct a ''job'' and schedule that inside pigs ''jobcontrol'' job. This 
also provides a way to add more pig specific parameters to this job before it 
is submitted. Most of the current native hadoop program create JobConf's and 
run hadoop jobs with ''JobClient.runJob(conf)''. These applications need to 
change their code to a getJobConf function so that pig can hook into them to 
get the conf. This will also allow pig to set the input and output directory 
for the native job.
+ For example-
+ {{{
+ public JobConf getJobConf() {
+     JobConf conf = new JobConf(WordCount.class);
+     conf.setJobName("wordcount");
+     
+     conf.setOutputKeyClass(Text.class);
+     conf.setOutputValueClass(IntWritable.class);
  
+     conf.setMapperClass(Map.class);
+     conf.setCombinerClass(Reduce.class);
+     conf.setReducerClass(Reduce.class);
+     
+     conf.setInputFormat(TextInputFormat.class);
+     conf.setOutputFormat(TextOutputFormat.class);
+     
+     FileInputFormat.setInputPaths(conf, new Path(args[0]));
+     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+ }
+ public static void main(String[] args) throws Exception { 
+     JobClient.runJob(getJobConf());
+ }
+ }}}
+ == Implementation Details ==
+ {{{
+ X = ... ;
+ Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
+ }}}
+ Logical Plan- Logical Plan creates a LONative operator with an internal plan 
that consists of a store and a load operator. The store operator cannot be 
attached to X at this level as it would start storing X at storeLocation for 
every plan that includes X which is not intended. Although we can LOLoad 
operator for Y at this point, we delay this to physical plan and track this 
with LONative operator. Also, since Y has dependency on X, we add plan of Y 
whenever we see plan for X in ''registerQuery''.
  
+ Physical Plan- Physical Plan adds the internal store to the physical plan and 
connects it to X and also adds the load to the plan with alias Y. Also, it 
creates a dependency between map reduce job for X and native map reduce job, 
and also between native map reduce job and plan having Y (which is a POLoad 
operator). We also create a MapReduceOper (customized) for the native map 
reduce job.
  
- == Implementation Details ==
- Logical Plan- 
+ MapReduce Plan- Inside the JobControlCompiler's compile method if we find the 
native mapreduce operator we can create a thread and run the Main method of 
native map reduce job with the specified parameters. Alternatively, we can call 
into native map reduce job's getJobConf method to get the job conf for the 
native job, then we can add pig specific parameters to this job and then add 
the job inside pig's jobcontrol.
+ 
+ == Questions ==
+  1. Do we need a Custom LoadFunc/StoreFunc for this?
+  2. What is the level of customization we need to support?
  
  == References ==
   1. <<Anchor(ref1)>> PIG-506, "Does pig need a NATIVE keyword?", 
https://issues.apache.org/jira/browse/PIG-506

Reply via email to