Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreRedesignProposal" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=40&rev2=41

--------------------------------------------------

  
  For load functions:
  
-  * '''!LoadFunc''' will be pared down to just contain functions directly 
associated with reading data, such as getNext.
+  * '''!LoadFunc''' will be pared down to just contain functions directly 
associated with reading data, such as getNext. '''!LoadFunc''' will now be an 
abstract class with default implementations for some of the methods.
   * A new '''!LoadCaster''' interface will be added.  This interface will 
contain all of the bytesToX methods currently in !LoadFunc.  !LoadFunc will add 
a `getCaster` routine, that will return an object that can provide casts.  The 
existing UTF8!StorageConverter class will change to implement this interface.  
Load functions will then be free to use this class as their caster, or provide 
their own.  For existing load functions that provide all of the bytesToX 
methods, they can implement the !LoadCaster interface and return themselves 
from the getCaster routine.  If a loader does not provide a !LoadCaster, casts 
from byte array to other Pig types will not be supported for data loaded via 
that loader.
   * A new '''!LoadMetadata''' interface will be added.  Calls that find 
metadata about the data being loaded, such as determineSchema, will be placed 
in this interface.  If a loader does not implement this interface, then Pig 
will assume that no metadata is obtainable for this data.
   * A new '''!LoadPushDown''' interface will be added.  Calls that determine 
what can be pushed down and pushing that functionality down into the loader 
will be placed in this interface.  If a loader does not implement this 
interface, then Pig will assume that the loader is not capable of pushing down 
any functionality.
@@ -29, +29 @@

   * A new interface '''!StoreMetadata''' will be added to provide a way for 
storage functions to record metadata.  If a given storage function does not 
implement this interface Pig will assume that it is unable to record metadata.
  
  === Details ===
- '''!LoadFunc'''
+ '''!LoadFunc''' 
- 
+ (For brevity, getAbsolutePath() method's implementation is not shown below)
  {{{
+ 
  /**
-  * This interface is used to implement functions to parse records
-  * from a dataset.
+  * <code>LoadFunc</code> provides functions directly associated with reading 
+  * records from data set.
   */
- public interface LoadFunc {
+ public abstract class LoadFunc {
+     
      /**
       * This method is called by the Pig runtime in the front end to convert 
the
       * input location to an absolute path if the location is relative. The
-      * loadFunc implementation is free to choose how it converts a relative
+      * loadFunc implementation is free to choose how it converts a relative 
       * location to an absolute location since this may depend on what the 
location
       * string represent (hdfs path or some other data source)
-      *
+      * 
       * @param location location as provided in the "load" statement of the 
script
       * @param curDir the current working direction based on any "cd" 
statements
       * in the script before the "load" statement. If there are no "cd" 
statements
-      * in the script, this would be the home directory -
+      * in the script, this would be the home directory - 
       * <pre>/user/<username> </pre>
       * @return the absolute location based on the arguments passed
       * @throws IOException if the conversion is not possible
       */
-     String relativeToAbsolutePath(String location, Path curDir) throws 
IOException;
+     public String relativeToAbsolutePath(String location, Path curDir) 
+             throws IOException {      
+         return getAbsolutePath(location, curDir);
+     }    
  
- 
      /**
-      * Communicate to the loader the location of the object(s) being loaded.
+      * Communicate to the loader the location of the object(s) being loaded.  
-      * The location string passed to the LoadFunc here is the return value of
+      * The location string passed to the LoadFunc here is the return value of 
-      * {...@link LoadFunc#relativeToAbsolutePath(String, String)}
+      * {...@link LoadFunc#relativeToAbsolutePath(String, Path)}. 
Implementations
+      * should use this method to communicate the location (and any other 
information)
+      * to its underlying InputFormat through the Job object.
-      *
+      * 
       * This method will be called in the backend multiple times. 
Implementations
       * should bear in mind that this method is called multiple times and 
should
       * ensure there are no inconsistent side effects due to the multiple 
calls.
-      *
+      * 
-      * @param location Location as returned by
+      * @param location Location as returned by 
-      * {...@link LoadFunc#relativeToAbsolutePath(String, String)}.
+      * {...@link LoadFunc#relativeToAbsolutePath(String, Path)}
       * @param job the {...@link Job} object
+      * store or retrieve earlier stored information from the {...@link 
UDFContext}
       * @throws IOException if the location is not valid.
       */
-     void setLocation(String location, Job job) throws IOException;
+     public abstract void setLocation(String location, Job job) throws 
IOException;
- 
+     
      /**
       * This will be called during planning on the front end. This is the
-      * instance of InputFormat (rather than the class name) because the
+      * instance of InputFormat (rather than the class name) because the 
-      * load function may need to instantiate the InputFormat in order
+      * load function may need to instantiate the InputFormat in order 
       * to control how it is constructed.
       * @return the InputFormat associated with this loader.
-      * @throws IOException if there is an exception during InputFormat
+      * @throws IOException if there is an exception during InputFormat 
       * construction
       */
+     @SuppressWarnings("unchecked")
-     InputFormat getInputFormat() throws IOException;
+     public abstract InputFormat getInputFormat() throws IOException;
  
      /**
-      * This will be called on the front end during planning and not on the 
back
+      * This will be called on the front end during planning and not on the 
back 
       * end during execution.
-      * @return the {...@link LoadCaster} associated with this loader. 
Returning null
+      * @return the {...@link LoadCaster} associated with this loader. 
Returning null 
-      * indicates that casts from byte array are not supported for this loader.
+      * indicates that casts from byte array are not supported for this 
loader. 
       * construction
-      * @throws IOException if there is an exception during LoadCaster
+      * @throws IOException if there is an exception during LoadCaster 
       */
-     LoadCaster getLoadCaster() throws IOException;
+     public LoadCaster getLoadCaster() throws IOException {
+         return new Utf8StorageConverter();
+     }
  
      /**
       * Initializes LoadFunc for reading data.  This will be called during 
execution
@@ -100, +110 @@

       * @param split The input {...@link PigSplit} to process
       * @throws IOException if there is an exception during initialization
       */
+     @SuppressWarnings("unchecked")
-     void prepareToRead(RecordReader reader, PigSplit split) throws 
IOException;
+     public abstract void prepareToRead(RecordReader reader, PigSplit split) 
throws IOException;
  
      /**
-      * Retrieves the next tuple to be processed.
+      * Retrieves the next tuple to be processed. Implementations should NOT 
reuse
+      * tuple objects (or inner member objects) they return across calls and 
+      * should return a different tuple object in each call.
       * @return the next tuple to be processed or null if there are no more 
tuples
       * to be processed.
       * @throws IOException if there is an exception while retrieving the next
       * tuple
       */
-     Tuple getNext() throws IOException;
+     public abstract Tuple getNext() throws IOException;
  
+     /**
+      * This method will be called by Pig both in the front end and back end to
+      * pass a unique signature to the {...@link LoadFunc}. The signature can 
be used
+      * to store into the {...@link UDFContext} any information which the 
+      * {...@link LoadFunc} needs to store between various method invocations 
in the
+      * front end and back end. A use case is to store {...@link 
RequiredFieldList} 
+      * passed to it in {...@link 
LoadPushDown#pushProjection(RequiredFieldList)} for
+      * use in the back end before returning tuples in {...@link 
LoadFunc#getNext()}.
+      * This method will be call before other methods in {...@link LoadFunc}
+      * @param signature a unique signature to identify this LoadFunc
+      */
+     public void setUDFContextSignature(String signature) {
+         // default implementation is a no-op
+     }
+        
  }
- }}}
- The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. 
functions currently in !LoadFunc.  UTF8!StorageConverter will implement this 
interface.
  
- '''Open Question''': Should the methods to convert to a Bag, Tuple and Map 
take a Schema (ResourceSchema?) argument?
+ 
+ }}}
+ The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. 
functions currently in !LoadFunc.  UTF8!StorageConverter will implement this 
interface. The only change will be in bytesToTuple() and bytesToBag() - these 
methods will take an additional !ResourceFieldSchema argument describing the 
schema of the tuple and bag respectively that the bytes need to be cast to.
  
  '''!LoadMetadata'''
  
@@ -198, +226 @@

          public Order[] sortKeyOrders;
      }
  }}}
- Feedback from Pradeep:  We must fix the two level access issues with schema 
of bags in current schema before we make these changes, otherwise that same 
contagion will afflict us here.
+ 
+ '''IMPORTANT NOTE:''': In the !ResourceFieldSchema for a bag field, the only 
field allowed in the subschema is a tuple field. The tuple itself can have a 
schema with more than 1 fields.
  
  '''!ResourceStatistics'''
  
@@ -239, +268 @@

   * be pushed into the loader.  If a given loader does not implement this 
interface
   * it will be assumed that it is unable to accept any functionality for push 
down.
   */
- interface LoadPushDown {
+ public interface LoadPushDown {
  
      /**
-      * Set of possible operations that Pig can push down to a loader.
+      * Set of possible operations that Pig can push down to a loader. 
       */
-     enum OperatorSet {PROJECTION, SELECTION};
+     enum OperatorSet {PROJECTION};
  
      /**
-      * Determine the operators that can be pushed to the loader.
+      * Determine the operators that can be pushed to the loader.  
       * Note that by indicating a loader can accept a certain operator
       * (such as selection) the loader is not promising that it can handle
-      * all selections.  When it is passed the actual operators to
+      * all selections.  When it is passed the actual operators to 
       * push down it will still have a chance to reject them.
       * @return list of all features that the loader can support
       */
      List<OperatorSet> getFeatures();
  
      /**
-      * Propose a set of operators to push to the loader.
-      * @param plan Plan containing proposed operators
-      * @return true if the loader accepts the plan, false if not.
-      * If false is returned Pig may choose to trim the plan and call
-      * this method again.
+      * Indicate to the loader fields that will be needed.  This can be useful 
for
+      * loaders that access data that is stored in a columnar format where 
indicating
+      * columns to be accessed a head of time will save scans.  This method 
will
+      * not be invoked by the Pig runtime if all fields are required. So 
implementations
+      * should assume that if this method is not invoked, then all fields from 
+      * the input are required. If the loader function cannot make use of this 
+      * information, it is free to ignore it by returning an appropriate 
Response
+      * @param requiredFieldList RequiredFieldList indicating which columns 
will be needed.
+      */
+     public RequiredFieldResponse pushProjection(RequiredFieldList 
+             requiredFieldList) throws FrontendException;
+     
+     public static class RequiredField implements Serializable {
+         
+         private static final long serialVersionUID = 1L;
+         
+         // will hold name of the field (would be null if not supplied)
+         private String alias; 
+ 
+         // will hold the index (position) of the required field (would be -1 
if not supplied), index is 0 based
+         private int index; 
+ 
+         // A list of sub fields in this field (this could be a list of hash 
keys for example). 
+         // This would be null if the entire field is required and no specific 
sub fields are required. 
+         // In the initial implementation only one level of subfields will be 
populated.
+         private List<RequiredField> subFields;
+         
+         // Type of this field - the value could be any current PIG DataType 
(as specified by the constants in DataType class).
+         private byte type;
+ 
+         public RequiredField() {
+             // to allow piece-meal construction
+         }
+         
+         /**
+          * @param alias
+          * @param index
+          * @param subFields
+          * @param type
-      */
+          */
-     boolean pushOperators(OperatorPlan plan);
+         public RequiredField(String alias, int index,
+                 List<RequiredField> subFields, byte type) {
+             this.alias = alias;
+             this.index = index;
+             this.subFields = subFields;
+             this.type = type;
+         }
+ 
+         /**
+          * @return the alias
+          */
+         public String getAlias() {
+             return alias;
+         }
+ 
+         /**
+          * @return the index
+          */
+         public int getIndex() {
+             return index;
+         }
+ 
+         
+         /**
+          * @return the required sub fields. The return value is null if all
+          *         subfields are required
+          */
+         public List<RequiredField> getSubFields() {
+             return subFields;
+         }
+         
+         public void setSubFields(List<RequiredField> subFields)
+         {
+             this.subFields = subFields;
+         }
+ 
+         /**
+          * @return the type
+          */
+         public byte getType() {
+             return type;
+         }
+ 
+         public void setType(byte t) {
+             type = t;
+         }
+         
+         public void setIndex(int i) {
+             index = i;
+         }
+         
+         public void setAlias(String alias)
+         {
+             this.alias = alias;
+         }
+ 
+     }
+ 
+     public static class RequiredFieldList implements Serializable {
+         
+         private static final long serialVersionUID = 1L;
+         
+         // list of Required fields, this will be null if all fields are 
required
+         private List<RequiredField> fields = new ArrayList<RequiredField>(); 
+         
+         /**
+          * @param fields
+          */
+         public RequiredFieldList(List<RequiredField> fields) {
+             this.fields = fields;
+         }
+ 
+         /**
+          * @return the required fields - this will be null if all fields are
+          *         required
+          */
+         public List<RequiredField> getFields() {
+             return fields;
+         }
+ 
+         public RequiredFieldList() {
+         }
+         
+         public void add(RequiredField rf)
+         {
+             fields.add(rf);
+         }
+     }
+ 
+     public static class RequiredFieldResponse {
+         // the loader should pass true if it will return data containing
+         // only the List of RequiredFields in that order. false if it
+         // will return all fields in the data
+         private boolean requiredFieldRequestHonored;
+ 
+         public RequiredFieldResponse(boolean requiredFieldRequestHonored) {
+             this.requiredFieldRequestHonored = requiredFieldRequestHonored;
+         }
+ 
+         // true if the loader will return data containing only the List of
+         // RequiredFields in that order. false if the loader will return all
+         // fields in the data
+         public boolean getRequiredFieldResponse() {
+             return requiredFieldRequestHonored;
+         }
+ 
+         // the loader should pass true if the it will return data containing
+         // only the List of RequiredFields in that order. false if the it
+         // will return all fields in the data
+         public void setRequiredFieldResponse(boolean honored) {
+             requiredFieldRequestHonored = honored;
+         }
+     }
+ 
+     
  }
  }}}
- An open question for !LoadPushdown is how do we communicate what needs to be 
pushed down?  In the above, !OperatorPlan is envisioned as a simple syntax tree 
that would be sufficient to communicate operators and their expressions to the 
storage functions.  Initially we considered using the !LogicalPlan as is.  But 
our !LogicalPlan is a mess right now.  It's also tightly tied to our 
implementation, so exposing it as an interface is unwise. We could use a SQL 
like string, but then loaders have to implement a parser.
  
  '''!StoreFunc'''
  
@@ -681, +857 @@

  
   * Updated section on streaming to reflect the current implementation.
  
+ Mar 1 2010, Pradeep Kamath
+  * Updated interfaces/abstract classes to reflect current state
+ 

Reply via email to