Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreRedesignProposal" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=21&rev2=22

--------------------------------------------------

  Hadoop has the notion of a single InputFormat per job. This is restrictive 
since Pig processes multiple inputs in the same map reduce job (in the case of 
Join, Union or Cogroup). This is handled by !PigInputFormat which is the 
!InputFormat Pig communicates to Hadoop as the Job's !InputFormat. In 
!PigInputFormat.getSplits(), the implementation processes each input in the 
following manner:
  
   * Instantiate the !LoadFunc associated with the input
-  * Make a clone of the Configuration passed in the getSplits() call and then 
invoke !LoadFunc.setlocation() using the clone. The reason a clone is necessary 
here is because generally in the setlocation() method, the loadfunc would 
communicate the location to its underlying !InputFormat. Typically 
!InputFormats store the location into the Configuration for use in the 
getSplits() call. For example, !FileInputFormat does this through 
!FileInputFormat.setInputLocation(Job job, String location). We don't updates 
to the Configuration for different inputs to over-write each other - hence the 
clone.
+  * Make a clone of the Configuration passed in the getSplits() call and then 
invoke !LoadFunc.setlocation() using the clone. The reason a clone is necessary 
here is because generally in the setlocation() method, the loadfunc would 
communicate the location to its underlying !InputFormat. Typically 
!InputFormats store the location into the Configuration for use in the 
getSplits() call. For example, !FileInputFormat does this through 
!FileInputFormat.setInputPaths(Job job, String location). We don't want updates 
to the Configuration for different inputs to over-write each other - hence the 
clone.
   * Call getInputFormat() on the !LoadFunc and then getSplits() on the 
!InputFormat returned. Note that the above setLocation call needs to happen 
*before* the getSplits() call and the getSplits() call needs to be given a 
!JobContext built out of the "updated (with location)" cloned Configuration.
   * Wrap each returned !InputSplit in !PigSplit to store information like the 
list of target operators (the pipeline) for this input, the index of the split 
in the List of Splits returned by getSplits (this is used during merge join 
index creation) etc (comments in PigSplit explain the members)
  
@@ -535, +535 @@

   * Instantiate the !LoadFunc associated with input represented by the 
PigSplit passed into !PigInputFormat.createRecordReader()
   * invoke !LoadFunc.setLocation()
   * Call getInputFormat() on the !LoadFunc and then createRecordReader() on 
the !InputFormat returned. Note that the above setLocation call needs to happen 
*before* the createRecordReader() call and the createRecordReader() call needs 
to be given a !TaskAttemptContext built out of the "updated (with location)" 
Configuration.
+  * Wrap the !RecordReader returned above in !PigRecordReader class which is 
returned to Hadoop as the !RecordReader. !PigRecordReader has Text as key type 
(which is always sent with a null value to Hadoop since in pig, we really do 
not extract a key from input records) and a Tuple as a the value type (which is 
a tuple constructed from the input record). 
  
  ==== Changes to work with Hadoop OutputFormat model ====
+ Hadoop has the notion of a single !OutputFormat per job. !PigOutputFormat is 
the class indicated by Pig as the !OutputFormat for map reduce jobs compiled 
from pig scripts. 
+ 
+ In !PigOutputFormat.checkOutputSpecs(), the implementation iterates over 
POStore(s) in the map and reduce phases and for each such store does the 
following:
+  * Instantiate the !StoreFunc associated with the POStore
+  * Make a clone of the JobContext passed in 
!PigOutputFormat.checkOutputSpecs() call and then invoke 
!StoreFunc.setStoreLocation() using the clone. The reason a clone is necessary 
here is because generally in the setStorelocation() method, the StoreFunc would 
communicate the location to its underlying !OutputFormat. Typically 
!OutputFormats store the location into the Configuration for use in the 
checkOutputSpecs() call. For example, !FileOutputFormat does this through 
!FileOutputFormat.setOutputPath(Job job, Path location). We don't want updates 
to the Configuration for different outputs to over-write each other - hence the 
clone.
+  * Call getOutputFormat() on the !StoreFunc and then checkOutputSpecs() on 
the !OutputFormat returned. Note that the above setStoreLocation call needs to 
happen *before* the checkOutputSpecs() call and the checkOutputSpecs() call 
needs to be given the "updated (with location)" cloned JobContext.
+ 
  
  === Remaining Tasks ===
   * !BinStorage needs to implement !LoadMetadata's getSchema() to replace 
current determineSchema()

Reply via email to