Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "NativeMapReduce" page has been changed by Aniket Mokashi. http://wiki.apache.org/pig/NativeMapReduce?action=diff&rev1=5&rev2=6 -------------------------------------------------- - = Page under construction = - #format wiki #language en @@ -18, +16 @@ == Syntax == To support native mapreduce job pig will support following syntax- - {{{ X = ... ; Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ]; @@ -35, +32 @@ Purpose of [[#ref2|pig streaming]] is to send data through an external script or program to transform a dataset into a different dataset based on a custom script written in any programming/scripting language. Pig streaming uses support of hadoop streaming to achieve this. Pig can register custom programs in a script, inline in the stream clause or using a define clause. Pig also provides a level of data guarantees on the data processing, provides feature for job management, provides ability to use distributed cache for the scripts (configurable). Streaming application run locally on individual mapper and reducer nodes for transforming the data. === Hive Transforms === - With [[#ref3|hive transforms]], users can also plug in their own custom mappers and reducers in the data stream. Basically, it is also an application of custom streaming supported by hadoop. Thus, these mappers and reducers can be written in any scripting languages and can be registered to distributed cache to help performance. To support custom map reduce programs written in java ([[#ref4|bezo's blog]]), we can use our custom mappers and reducers as data streaming functions and use them to transform the data using 'java -cp mymr.jar'. This will not invoke a map reduce task but will attempt to transform the data during the map or the reduce task (locally). + With [[#ref3|hive transforms]], users can also plug in their own custom mappers and reducers in the data stream. Basically, it is also an application of custom streaming supported by hadoop. Thus, these mappers and reducers can be written in any scripting languages and can be registered to distributed cache to help performance. To support custom map reduce programs written in java ([[#ref4|bizo's blog]]), we can use our custom mappers and reducers as data streaming functions and use them to transform the data using 'java -cp mymr.jar'. This will not invoke a map reduce task but will attempt to transform the data during the map or the reduce task (locally). Thus, both these features can transform data submitted to a map reduce job (mapper) into a different data set and/or transform data produced by a mapreduce job (reducer) into a different data set. But we should notice that data tranformation takes on a single machine and does not take advantage of map reduce framework itself. Also, these blocks only allow custom transformations inside the data pipeline and does not break the pipeline. @@ -45, +42 @@ Native Mapreduce job needs to conform to some specification defined by Pig. This is required because Pig specifies the input and output directory in the script for this job and is responsible for managing the coordination of the native job with the remaining pig mapreduce jobs. Pig also might need to provide some extra configuration like job name, input/output formats, parallelism to the native job. For communicating such parameters to the native job, it should be according to specification provided by Pig. Following are some of the approaches of achieving this- - 1. Ordered inputLoc/outputLoc parameters- This is simplistic approach wherein native programs follow up a convention so that their first and second parameters are treated as input and output respectively. Pig ''native'' command takes the parameters required by the native mapreduce job and passes it to native job as command line arguments. It is upto the native program to use these parameters for operations it performs. + 1. '''Ordered inputLoc/outputLoc parameters'''- This is simplistic approach wherein native programs follow up a convention so that their first and second parameters are treated as input and output respectively. Pig ''native'' command takes the parameters required by the native mapreduce job and passes it to native job as command line arguments. It is upto the native program to use these parameters for operations it performs. Thus, only following lines of code are mandatory inside the native program. {{{ FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); }}} - 2. getJobConf Function- Native jobs implement '''getJobConf''' method which returns org.apache.hadoop.mapred.JobConf object so that pig can schedule the job. This also provides a way to add more pig specific parame + 1.#2 '''getJobConf Function'''- Native jobs implement '''getJobConf''' method which returns ''org.apache.hadoop.mapred.JobConf'' object so that pig can construct a ''job'' and schedule that inside pigs ''jobcontrol'' job. This also provides a way to add more pig specific parameters to this job before it is submitted. Most of the current native hadoop program create JobConf's and run hadoop jobs with ''JobClient.runJob(conf)''. These applications need to change their code to a getJobConf function so that pig can hook into them to get the conf. This will also allow pig to set the input and output directory for the native job. + For example- + {{{ + public JobConf getJobConf() { + JobConf conf = new JobConf(WordCount.class); + conf.setJobName("wordcount"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + conf.setMapperClass(Map.class); + conf.setCombinerClass(Reduce.class); + conf.setReducerClass(Reduce.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(args[0])); + FileOutputFormat.setOutputPath(conf, new Path(args[1])); + } + public static void main(String[] args) throws Exception { + JobClient.runJob(getJobConf()); + } + }}} + == Implementation Details == + {{{ + X = ... ; + Y = NATIVE ('mymr.jar' [, 'other.jar' ...]) STORE X INTO 'storeLocation' USING storeFunc LOAD 'loadLocation' USING loadFunc [params, ... ]; + }}} + Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at storeLocation for every plan that includes X which is not intended. Although we can LOLoad operator for Y at this point, we delay this to physical plan and track this with LONative operator. Also, since Y has dependency on X, we add plan of Y whenever we see plan for X in ''registerQuery''. + Physical Plan- Physical Plan adds the internal store to the physical plan and connects it to X and also adds the load to the plan with alias Y. Also, it creates a dependency between map reduce job for X and native map reduce job, and also between native map reduce job and plan having Y (which is a POLoad operator). We also create a MapReduceOper (customized) for the native map reduce job. - == Implementation Details == - Logical Plan- + MapReduce Plan- Inside the JobControlCompiler's compile method if we find the native mapreduce operator we can create a thread and run the Main method of native map reduce job with the specified parameters. Alternatively, we can call into native map reduce job's getJobConf method to get the job conf for the native job, then we can add pig specific parameters to this job and then add the job inside pig's jobcontrol. + + == Questions == + 1. Do we need a Custom LoadFunc/StoreFunc for this? + 2. What is the level of customization we need to support? == References == 1. <<Anchor(ref1)>> PIG-506, "Does pig need a NATIVE keyword?", https://issues.apache.org/jira/browse/PIG-506