Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/LoadStoreRedesignProposal

------------------------------------------------------------------------------
  
   1. The Slicer interface is redundant.  Remove it and allow users to directly 
use Hadoop !InputFormats in Pig.
   1. It is not currently easy to use a separate !OutputFormat for a 
!StoreFunc.  This should be made easy to allow users to store data into 
locations other than HDFS.
-  1. Currently users that wish to operate on Pig and Map-Reduce are required 
to write Hadoop !InputFormat and !OutpuFormat as well as a Pig loader and Pig 
storage functions.  While Pig load and store functions will always be necessary 
to take the most advantage of Pig, it would be good for users to be able to use 
Hadoop !InputFormat and !OutputFormat classes directly to minimize the data 
interchange cost.
+  1. Currently users that wish to operate on Pig and Map-Reduce are required 
to write Hadoop !InputFormat and !OutputFormat as well as a Pig load and 
storage functions.  While Pig load and store functions will always be necessary 
to take the most advantage of Pig, it would be good for users to be able to use 
Hadoop !InputFormat and !OutputFormat classes directly to minimize the data 
interchange cost.
-  1. The major difference between a Hadoop !InputFormat and a Pig load 
function is the data model.  Hadoop views data as key-value pairs, Pig as a 
tuple.  Similarly for !OutputFormat and store functions.  
   1. New storage formats such as Zebra are being implemented for Hadoop that 
include metadata information such as schema, etc.  The !LoadFunc interface 
needs to allow Pig to obtain this metadata.  There is a describeSchema call in 
the current interface.  More functions may be necessary.
   1. These new storage formats also plan to support pushing of, at least, 
projection and selection into the storage layer.  Pig needs to be able to query 
loaders to determine what if any pushdown capabilities they support and then 
make use of those capabilities.
   1. There already exists one metadata system in Hadoop (Hive's metastore) and 
there is a proposal to add another (Owl).  Pig needs to be able to query these 
metadata systems for information about data to be read.  It also needs to be 
able to record information to these metadata systems when writing data.  The 
load and store functions are a reasonable place to do these operations since 
that is the point at which Pig is reading and writing data.  This will also 
allow Pig to read and write data from and to multiple metadata stores in single 
Pig Latin scripts if that is desired.
@@ -22, +21 @@

  == Interfaces ==
  With these proposed changes, load and store functions in Pig are becoming 
very weighty objects.  The current !LoadFunc interface already
  provides mechanisms for reading the data, getting some schema information, 
casting data, and some place holders for pushing down projections into
- the loader.  This proposal will add more file level metadata, global 
metadata, selection push down, plus interaction with !InputFormats.  It will
+ the loader.  This proposal will add more file level metadata, selection push 
down, plus interaction with !InputFormats.  It will
  also add !OutputFormats to store functions.  If we create two monster 
interfaces that attempt to provide everything, the burden of creating a
  new load or store function in Pig will become overwhelming.  Instead, this 
proposal envisions splitting the interface into a number of
  interfaces, each with a clear responsibility.  Load and store functions will 
then only be required to implement the interfaces for functionality they offer.
  
  For load functions:
-  * !LoadFunc will be pared down to just contain functions directly associated 
with reading data, such as getNext.
+  * '''!LoadFunc''' will be pared down to just contain functions directly 
associated with reading data, such as getNext.
-  * A new !LoadCaster interface will be added.  This interface will contain 
all of the bytesToX methods currently in !LoadFunc.  !LoadFunc will add a 
getCaster routine, that will return an object that can provide casts.  The 
existing UTF8!StorageConverter class will change to implement this interface.  
Load functions will then be free to use this class as their caster, or provide 
their own.  For existing load functions that provide all of the bytesToX 
methods, they can implement the !LoadCaster interface and return themselves 
from the getCaster routine.  If a loader does not provide a !LoadCaster, casts 
from byte array to other pig types will not be supported for data loaded via 
that loader.
+  * A new '''!LoadCaster''' interface will be added.  This interface will 
contain all of the bytesToX methods currently in !LoadFunc.  !LoadFunc will add 
a `getCaster` routine, that will return an object that can provide casts.  The 
existing UTF8!StorageConverter class will change to implement this interface.  
Load functions will then be free to use this class as their caster, or provide 
their own.  For existing load functions that provide all of the bytesToX 
methods, they can implement the !LoadCaster interface and return themselves 
from the getCaster routine.  If a loader does not provide a !LoadCaster, casts 
from byte array to other Pig types will not be supported for data loaded via 
that loader.
-  * A new !LoadMetadata interface will be added.  Calls that find metadata 
about the data being loaded, such as determineSchema, will be placed in this 
interface.  If a loader does not implement this interface, then Pig will assume 
that no metadata is obtainable for this data.
+  * A new '''!LoadMetadata''' interface will be added.  Calls that find 
metadata about the data being loaded, such as determineSchema, will be placed 
in this interface.  If a loader does not implement this interface, then Pig 
will assume that no metadata is obtainable for this data.
-  * A new !LoadPushDown interface will be added.  Calls that determine what 
can be pushed down and pushing that functionality down into the loader will be 
placed in this interface.  If a loader does not implement this interface, then 
Pig will assume that the loader is not capable of pushing down any 
functionality.
+  * A new '''!LoadPushDown''' interface will be added.  Calls that determine 
what can be pushed down and pushing that functionality down into the loader 
will be placed in this interface.  If a loader does not implement this 
interface, then Pig will assume that the loader is not capable of pushing down 
any functionality.
  
  For store functions:
-  * A new method getOutputFormat will be added to !StoreFunc to allow a 
storage function to return its output format.
+  * A new method `getOutputFormat` will be added to !StoreFunc to allow a 
storage function to return its output format.
-  * A new interface !StoreMetadata will be added to provide a way for storage 
functions to record metadata.  If a given storage function does not implement 
this interface Pig will assume that it is unable to record metadata.
+  * A new interface '''!StoreMetadata''' will be added to provide a way for 
storage functions to record metadata.  If a given storage function does not 
implement this interface Pig will assume that it is unable to record metadata.
  
  === Details ===
  
- !LoadFunc:
+ '''!LoadFunc'''
  
  {{{
  /**
   * This interface is used to implement functions to parse records
   * from a dataset.
   */
- public interface LoadFunc {
+ interface LoadFunc {
+ 
+     /**
+      * Communicate to the loader the URIs used in Pig Latin to refer to the 
+      * object(s) being loaded.  That is, if the PL script is
+      * <b>A = load 'bla'</b>
+      * then 'bla' is the URI.  Load functions should assume that if no
+      * scheme is provided in the URI it is an hdfs file.  This will be 
+      * called during planning on the front end, not during execution on
+      * the backend.
+      * @param uri URIs referenced in load statement.
+      */
+     void setURI(URI[] uri);
+     
+     /**
+      * Return the InputFormat associated with this loader.  This will be
+      * called during planning on the front end.  The LoadFunc need not
+      * carry the InputFormat information to the backend, as it will
+      * be provided with the appropriate RecordReader there.
+      */
+     InputFormat getInputFormat();
+ 
+     /**
+      * Return the LoadCaster associated with this loader.  Returning
+      * null indicates that casts from byte array are not supported
+      * for this loader.  This will be called on the front end during
+      * planning and not on the back end during execution.
+      */
+     LoadCaster getLoadCaster();
+ 
+     /**
+      * Initializes LoadFunc for reading data.  This will be called during 
execution
+      * before any calls to getNext.
+      * @param reader RecordReader to be used by this instance of the LoadFunc
+      */
+     void prepareToRead(RecordReader reader);
+ 
      /**
       * Retrieves the next tuple to be processed.
       * @return the next tuple to be processed or null if there are no more 
tuples
@@ -55, +90 @@

       */
      Tuple getNext() throws IOException;
  
-     /**
-      * Communicate to the loader the URI used in Pig Latin to refer to the 
-      * object being loaded.  That is, if the PL script is
-      * <b>A = load 'bla'</b>
-      * then 'bla' is the URI.  Load functions should assume that if no
-      * scheme is provided in the URI it is an hdfs file.
-      * @param uri URI referenced in load statement
-      */
-     void setURI(URI uri);
-     
-     /**
-      * Return the InputFormat associated with this loader.
-      */
-     InputFormat getInputFormat();
- 
-     /**
-      * Return the LoadCaster associated with this loader.  Returning
-      * null indicates that casts from byte array are not supported
-      * for this loader.
-      */
-     LoadCaster getLoadCaster();
  }
  
  }}}
  
+ Open questions for !LoadFunc:
+  1. Should setURI instead be setLocation and just take a String?  The 
advantage of a URI is we know exactly what users are trying to communicate 
with, and we can define what Pig does in default cases (when a scheme is not 
given).  The disadvantage is forcing more structure on users and their load 
functions.  I'm still pretty strongly on the side of using URI.
+ 
- The !LoadCaster interface will include bytesToInt, bytesToLong, etc. functions
+ The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. 
functions
  currently in !LoadFunc.  UTF8!StorageConverter will implement this interface.
  
- !LoadMetadata:
+ '''!LoadMetadata'''
  {{{
  
  /**
@@ -102, +119 @@

      LoadSchema getSchema();
  
      /**
-      * Get statistics about the data to be loaded.
+      * Get statistics about the data to be loaded.  If no statistics are
+      * available, then null should be returned.
       */
      LoadStatistics getStatistics();
  
@@ -110, +128 @@

  
  }}}
  
- !LoadSchema will be a top level object (`org.apache.pig.LoadSchema`) used to 
communicate information about data to be loaded or that is being
+ '''!LoadSchema''' will be a top level object (`org.apache.pig.LoadSchema`) 
used to communicate information about data to be loaded or that is being
  stored.  It is not the same as the existing 
`org.apache.pig.impl.logicalLayer.schema.Schema`.
  
  {{{
      public class LoadSchema {
  
+         int version;
+ 
-         public class FileFieldSchema {
+         public class LoadFieldSchema {
              public String name;
              public DataType type;
              public String description;
+             public LoadFieldSchema schema; // nested tuples and bags will 
have their own schema
          }
  
-         public ColumnSchema[] fields;
+         public LoadFieldSchema[] fields;
          public Map<String, Integer> byName;
  
          enum Order { ASCENDING, DESCENDING }
@@ -132, +153 @@

      }
  }}}
  
+ Feedback from Pradeep:  We must fix the two level access issues with schema 
of bags in current schema before we make these changes, otherwise that
+ same contagion will afflict us here.
+ 
- !LoadStatistics:
+ '''!LoadStatistics'''
- 
  {{{
      public class LoadStatistics {
  
          public class LoadFieldStatistics {
+ 
+             int version;
  
              enum Distribution {UNIFORM, NORMAL, POWER};
  
@@ -154, +179 @@

  
          public long mBytes; // size in megabytes
          public long numRecords;  // number of records
+         public LoadFieldStatistics[] fields;
  
          // Probably more in here
      }
  }}}
  
- An open question for !LoadPushdown is how do we communicate what needs to be 
pushed down?  We could do it via segments of a !LogicalPlan or some
- type of tree.  This isn't very convenient for Map-Reduce, which will also 
want to communicate push downs to !InputFormats.  We could do it via
- SQL like string, but then loaders have to implement a parser.
+ At this point, !LoadStatistics is poorly understood.  In initial versions we 
may choose not to implement it.  In additions to questions
+ on what should be in the statistics, there are questions on how statistics 
should be communicated in relation to partitions.  For example,
+ when loading from a table that is stored in owl or hive, one or more 
partitions may be being loaded.  Assuming statistics are kept on the
+ partition level, how are these statistics then communicated to Pig?  Is it 
the loader's job to combine the statistics for all of the
+ partitions being read?  Or does it return an array of !LoadStatistics?  But 
if so, what does Pig do with it since it does not know which
+ tuples belong to which partitions (and doesn't want to know).  Even worse on 
store, any statistics Pig has to report is across all data
+ being stored.  But the storage function underneath may choose to partition 
the data.  How does it then separate those statistics for the
+ different partitions?  In these cases should store functions be in charge of 
calculating statistics?  Perhaps some statistics that can be
+ easily distributed across partitions (such as distribution types) should be 
calculated and sent down by Pig and some left to the store
+ functions.  Perhaps statistics on store should instead be driven by call 
backs that Pig could define and implement and store functions should
+ call after data has been partitioned if they want to.
  
+ 
- !LoadPushdown:
+ '''!LoadPushdown'''
  {{{
  
  /**
@@ -190, +225 @@

  
      /**
       * Propose a set of operators to push to the loader.
-      * @param plan LogicalPlan containing proposed operators
+      * @param plan Plan containing proposed operators
       * @return true if the loader accepts the plan, false if not.
       * If false is returned Pig may choose to trim the plan and call
       * this method again.
       */
-     boolean pushOperators(LogicalPlan plan);
+     boolean pushOperators(OperatorPlan plan);
  }
  
  }}}
  
- !StoreFunc:
+ An open question for !LoadPushdown is how do we communicate what needs to be 
pushed down?  In the above, !OperatorPlan is envisioned as a simple
+ syntax tree that would be sufficient to communicate operators and their 
expressions to the storage functions.  Initially we considered using the
+ !LogicalPlan as is.  But our !LogicalPlan is a mess right now.  It's also 
tightly tied to our implementation, so exposing it as an interface is
+ unwise. We could use a SQL like string, but then loaders have to implement a 
parser.
+ 
+ '''!StoreFunc'''
  
  {{{
  public interface StoreFunc {
  
      /**
-      * Return the OutputFormat associated with StoreFunc.
+      * Return the OutputFormat associated with StoreFunc.  This will be called
+      * on the front end during planning and not on the backend during
+      * execution.  OutputFormat information need not be carried to the back 
end
+      * as the appropriate RecordWriter will be provided to the StoreFunc.
       */
      OutputFormat getOutputFormat();
  
      /**
+      * Communicate to the store function the URIs used in Pig Latin to refer 
+      * to the object(s) being stored.  That is, if the PL script is
+      * <b>store A into 'bla'</b>
+      * then 'bla' is the URI.  Store functions should assume that if no
+      * scheme is provided in the URI it is an hdfs file.  This will be 
+      * called during planning on the front end, not during execution on
+      * the backend.
+      * @param uri URIs referenced in store statement.
+      */
+     void setURI(URI[] uri);
+  
+     /**
+      * Set the schema for data to be stored.  This will be called on the
+      * front end during planning.  If the store function wishes to record
+      * the schema it will need to carry it to the backend.
+      * Even if a store function cannot
+      * record the schema, it may need to implement this function to
-      * Check that a given schema is acceptable to this storage function.
+      * check that a given schema is acceptable to it.  For example, it
-      * This can be used to check that the correct partition keys are included.
+      * can check that the correct partition keys are included;
-      * For a storage function to be written directly to an OutputFormat it can
+      * a storage function to be written directly to an OutputFormat can
-      * be used to make sure the schema will translate in a well defined way.
+      * make sure the schema will translate in a well defined way.  
-      * @param schema to be checked
+      * @param schema to be checked/set
       * @throw IOException if this schema is not acceptable.  It should include
       * a detailed error message indicating what is wrong with the schema.
       */
-     void checkSchema(LoadSchema s) throws IOException;
+     void setSchema(LoadSchema s) throws IOException;
+ 
+     /**
+      * Initialize StoreFunc to write data.  This will be called during
+      * execution before the call to putNext.
+      * @param writer RecordWriter to use.
+      */
+     void prepareToWrite(RecordWriter writer);
  
      /**
       * Write a tuple the output stream to which this instance was
@@ -228, +295 @@

       * @param f the tuple to store.
       * @throws IOException
       */
-     void putNext(Tuple f) throws IOException;
+     void putNext(Tuple t) throws IOException;
  
  }
  
  }}}
  
- !StoreMetadata
+ '''!StoreMetadata'''
  
  {{{
   /**
@@ -245, +312 @@

  interface StoreMetadata {
  
      /**
-      * Set a schema for the data being written.
-      */
-     void setSchema(LoadSchema s);
- 
-     /**
       * Set statistics about the data being written.
       */
      void setStatistics(LoadStatistics stats);
@@ -258, +320 @@

  
  }}}
  
+ Given the uncertainly noted above under !LoadStatistics on how statistics 
should be stored, it is not clear that this interface makes sense.
-    
- 
  
  == LoadFunc and InputFormat Interaction ==
  
@@ -268, +329 @@

  which processing pipeline a tuple from a given input is assigned to.  
Currently load functions also duplicate much of the functionality of
  Hadoop's !RecordReader interface.  Currently load functions are provided a 
data stream and asked to parse both records and fields within records.
  !RecordReader already provides the functionality to parse out records, though 
it always returns two values, key and value, not a tuple of any
- number of values.  The !LoadFunc interface cannot be removed because it still 
needs to parse out fields within a record.  But an integral part of
+ number of values.  An integral part of
- this proposal is to change the !LoadFunc to call !RecordReader.getKey and 
getValue and then parse out fields in the tuple from that
+ this proposal is to change the !LoadFunc to call !RecordReader.getCurrentKey 
and getCurrentValue and then parse out fields in the tuple from that
  result.
  
  Since Pig still needs to add information to !InputSplits, user provided 
!InputFormats and !InputSplits cannot be used directly.  Instead, the
@@ -282, +343 @@

  getNext will then take the key and value provided by the associated 
!RecordReader and construct a two field tuple.  These types will be converted
  to Pig types as follows:
  
- || Writable         || Pig type   || Comment                                  
                                     ||
+ || Writable         || Pig type   || Comment                                  
                                                ||
- || Text             || chararray  ||                                          
                                     ||
+ || Text             || chararray  ||                                          
                                                ||
- || !IntWritable     || integer    ||                                          
                                     ||
+ || !IntWritable     || integer    ||                                          
                                                ||
- || V!IntWritable    || integer    ||                                          
                                     ||
+ || V!IntWritable    || integer    ||                                          
                                                ||
- || !LongWritable    || long       ||                                          
                                     ||
+ || !LongWritable    || long       ||                                          
                                                ||
- || V!LongWritable   || long       ||                                          
                                     ||
+ || V!LongWritable   || long       ||                                          
                                                ||
- || !FloatWritable   || float      ||                                          
                                     ||
+ || !FloatWritable   || float      ||                                          
                                                ||
- || !DoubleWritable  || double     ||                                          
                                     ||
+ || !DoubleWritable  || double     ||                                          
                                                ||
- || !BooleanWritable || int        || In the future if Pig exposes boolean as 
a first class type, this would change ||
+ || !BooleanWritable || int        || In the future if Pig exposes boolean as 
a first class type, this would change to boolean ||
- || !ByteWritable    || int        ||                                          
                                     ||
+ || !ByteWritable    || int        ||                                          
                                                ||
- || !NullWritable    || null       ||                                          
                                     ||
+ || !NullWritable    || null       ||                                          
                                                ||
- || All others       || byte array ||                                          
                                     ||
+ || All others       || byte array ||                                          
                                                ||
  
  Since the format of any other types are unknown to Pig and cannot be 
generalized, it does not make sense to provide casts from byte array to pig
  types via a !LoadCaster.  If users wish to use an !InputFormat that uses 
types beyond these and cast them to Pig types, they can extend the
@@ -322, +383 @@

  
  However, in some places Pig needs this position information.  In particular, 
when building an index for a merge join, Pig needs a way to mark a
  location in an input while building the index and then return to that 
position during the join.  This issue will have to be pursued with the MR
- team to see if there is a way to provide this functionality for input types 
where it makes sense.
+ team to see if there is a way to provide this functionality for input types 
where it makes sense.  If they are unwilling to provide it, or it will
+ take them some time to provide it, we could instead create our own 
!SeekableInputFormat that would define a way to mark and seek.  Zebra could
+ implement this for their !InputFormat.  The Pig team could extend 
!TextInputFormat to implement it for text files.  !PigStorage would then use 
this
+ new !SeekableTextInputFormat rather than using !TextInputFormat directly.
  
  These changes will affect the !SamplableLoader interface.  Currently it uses 
skip and getPos to move the underlying stream so that it can pick
  up a sample of tuples out of a block.  Since it would sit atop !InputFormat 
it would no longer have access to the underlying stream.  It could be
@@ -333, +397 @@

  These loaders will need to know how to read at least the first record on 
their own, without the benefit of the underlying !InputFormat, since
  they will need to call this on the front end where an !InputFormat has not 
yet been used.
  
- Open question:  for cases where we want to open the file as a side file, 
rather than as the main MR file, is it still possible to use an IF to
- read it?  Pig needs to read these side files and still parse them into 
tuples.  It will have to be able to invoke the IF/IS for this file.
+ In addition to opening files as part of Map-Reduce, Pig loaders also open 
files on the side in MR jobs.  The new load interface needs to be able to
+ open these side files as well.  According to Arun, this is doable but 
creating a new instance of the appropriate !InputFormat, calls getSplits, and
+ then creating a !RecordReader on it.
  
  '''Performance concerns.'''
  
@@ -353, +418 @@

  
  To support arbitrary !OutputFormats, a new storage function 
!OutputFormatStorage will be written that will take an !OutputFormat as a 
constructor
  argument.  Tuples to be stored by this storage function must have either one 
or two fields.   If they have two fields, the first of will be taken
- to be the key, and the second the value.  If they have one, the key will be 
set to null and the value will be taken from the single field.
+ to be the key, and the second the value.  If they have one, the key will be 
set to null and the value will be taken from the single field.  Data
+ type conversion on this data will be done in the same way as noted above for 
!InputFormatLoader.
  
+ Open Questions:
+  1. Does all this force us to switch to Hadoop for local mode as well?  We 
aren't opposed to using Hadoop for local mode it just needs to get reasonable 
fast.  Can we use !InputFormat ''et. al.'' on local files without using the 
whole HDFS structure?
+ 

Reply via email to