Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "NativeMapReduce" page has been changed by Aniket Mokashi.
http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=8&rev2=9

--------------------------------------------------

+ = Under Construction =
  #format wiki
  #language en
  
@@ -18, +19 @@

  To support native mapreduce job pig will support following syntax-
  {{{
  X = ... ;
- Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc AS schema [params, ... ];
+ Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' 
USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];
  }}}
  
- This stores '''X''' into the '''storeLocation''' using '''storeFunc''', which 
is then used by native mapreduce to read its data. After we run mymr.jar's 
mapreduce, we load back the data from '''loadLocation''' into alias '''Y''' 
using '''loadFunc'''.
+ This stores '''X''' into the '''inputLocation''' using '''storeFunc''', which 
is then used by native mapreduce to read its data. After we run mymr.jar's 
mapreduce, we load back the data from '''outputLocation''' into alias '''Y''' 
using '''loadFunc''' as '''schema'''.
  
  params are extra parameters required for native mapreduce job.
  
- '''mymr.jar is any mapreduce jar file which can be run through "hadoop -jar 
mymr.jar params" command.'''
+ mymr.jar is any mapreduce jar file which can be run through '''"hadoop -jar 
mymr.jar params"''' command. Thus, the contract for ''inputLocation'' and 
''outputLocation'' is typically managed through ''params''. 
  
  For Example, to run wordcount mapreduce program from Pig, we write
  {{{
  A = load 'WordcountInput.txt';
- B = MAPREDUCE wordcount.jar Store A into 'inputDir' Load 'outputDir' as 
(word:chararray, count: int) org.myorg.WordCount inputDir outputDir;
+ B = MAPREDUCE wordcount.jar Store A into 'inputDir' Load 'outputDir' as 
(word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;
  }}}
  
  == Comparison with similar features ==
@@ -45, +46 @@

  With native job support, pig can support native map reduce jobs written in 
java language that can convert a data set into a different data set after 
applying a custom map reduce functions of any complexity.
  
  == Implementation Details ==
+ 
  {{{
  X = ... ;
- Y = MAPREDUCE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' 
USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ];
+ Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' 
USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];
  }}}
- Logical Plan- Logical Plan creates a LONative operator with an internal plan 
that consists of a store and a load operator. The store operator cannot be 
attached to X at this level as it would start storing X at storeLocation for 
every plan that includes X which is not intended. Although we can LOLoad 
operator for Y at this point, we delay this to physical plan and track this 
with LONative operator. Also, since Y has dependency on X, we add plan of Y 
whenever we see plan for X in ''registerQuery''.
  
- Physical Plan- Physical Plan adds the internal store to the physical plan and 
connects it to X and also adds the load to the plan with alias Y. Also, it 
creates a dependency between map reduce job for X and native map reduce job, 
and also between native map reduce job and plan having Y (which is a POLoad 
operator). We also create a MapReduceOper (customized) for the native map 
reduce job.
+ === Pig Plans ===
+ Logical Plan- Logical Plan creates a LONative operator with an internal plan 
that consists of a store and a load operator. The store operator cannot be 
attached to X at this level as it would start storing X at inputLocation for 
every plan that includes X, which is not intended. Although we can LOLoad 
operator for Y at this point, we delay this to physical plan and track this 
with LONative operator. Since Y has dataflow dependency on X, we make a 
connection between operators corresponding to these aliased at logical plan.
  
- MapReduce Plan- Inside the JobControlCompiler's compile method if we find the 
native mapreduce operator we can create a thread and run the Main method of 
native map reduce job with the specified parameters. Alternatively, we can call 
into native map reduce job's getJobConf method to get the job conf for the 
native job, then we can add pig specific parameters to this job and then add 
the job inside pig's jobcontrol.
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (LOStore) Store X into 
'inputLocation'
+     Y = MapReduce ... ;              |
+       (LONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (LOLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}}  
+ TypeCastInserter-
  
- == Questions ==
-  1. Do we need a Custom LoadFunc/StoreFunc for this?
-  2. What is the level of customization we need to support?
+ Physical Plan- Logical plan is visited to convert internal plan of load store 
combination into corresponding physical plan operators and connections are 
maintained as per the logical plan.
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (POStore) Store X into 
'inputLocation'
+     Y = MapReduce ... ;              |
+       (PONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (POLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}} 
+ 
+ MapReduce Plan- While compiling the mapreduce plan, with MRCompiler, we 
introduce 
+ {{{
+     X = ... ;
+         |
+         |
+         |                            |--- (POStore) Store X into 
'inputLocation'
+     Y = MapReduce ... ;              |
+       (PONative)   --  innnerPlan ---|
+         mymr.jar                     |
+         params                       |--- (POLoad) Load 'outputLocation'
+         |
+         |
+         ...
+ }}}
+ Inside the JobControlCompiler's compile method if we find the native 
mapreduce operator we run the org.apache.hadoop.util.RunJar's Main method with 
the specified parameters.
+ 
+ === Security Manager ===
+ hadoop jar command is equivalent to invoking org.apache.hadoop.util.RunJar's 
main function with required arguments. RunJar internally can invoke several 
levels of driver classes before executing the hadoop job (for example- 
hadoop-example.jar). With the 
+ 
+ === Pig Stats ===
  
  == References ==
   1. <<Anchor(ref1)>> PIG-506, "Does pig need a NATIVE keyword?", 
https://issues.apache.org/jira/browse/PIG-506

Reply via email to