Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/LoadStoreRedesignProposal

New page:
= Proposed Redesign For Load, Store, and Slicer in Pig =

== Goals ==
The current design of !LoadFunc, !StoreFunc, and the Slicer interfaces in Pig 
are not adequate.  This proposed redesign has the following goals:

 1. The Slicer interface is redundant.  Remove it and allow users to directly 
use Hadoop !InputFormats in Pig.
 1. It is not currently easy to use a separate !OutputFormat for a !StoreFunc.  
This should be made easy to allow users to store data into locations other than 
HDFS.
 1. Currently users that wish to operate on Pig and Map-Reduce are required to 
write Hadoop !InputFormat and !OutpuFormat as well as a Pig loader and Pig 
storage functions.  While Pig load and store functions will always be necessary 
to take the most advantage of Pig, it would be good for users to be able to use 
Hadoop !InputFormat and !OutputFormat classes directly to minimize the data 
interchange cost.
 1. The major difference between a Hadoop !InputFormat and a Pig load function 
is the data model.  Hadoop views data as key-value pairs, Pig as a tuple.  
Similarly for !OutputFormat and store functions.  
 1. New storage formats such as Zebra are being implemented for Hadoop that 
include metadata information such as schema, etc.  The !LoadFunc interface 
needs to allow Pig to obtain this metadata.  There is a describeSchema call in 
the current interface.  More functions may be necessary.
 1. These new storage formats also plan to support pushing of, at least, 
projection and selection into the storage layer.  Pig needs to be able to query 
loaders to determine what if any pushdown capabilities they support and then 
make use of those capabilities.
 1. There already exists one metadata system in Hadoop (Hive's metastore) and 
there is a proposal to add another (Owl).  Pig needs to be able to query these 
metadata systems for information about data to be read.  It also needs to be 
able to record information to these metadata systems when writing data.  The 
load and store functions are a reasonable place to do these operations since 
that is the point at which Pig is reading and writing data.  This will also 
allow Pig to read and write data from and to multiple metadata stores in single 
Pig Latin scripts if that is desired.

A requirement for the implementation that does not fit into the goals above is 
that while the existing Pig implementation is tightly tied to
Hadoop (and is becoming more tightly tied all the time), we do not want to tie 
Pig Latin tightly to Hadoop.  Therefore while we plan to allow
users to easily interact with Hadoop !InputFormats and !OutputFormats, these 
should not be exposed as such to Pig Latin.  Pig Latin must still
view these as load and store functions; it will only be the underlying 
implementation that will realize that they are Hadoop classes and handle
them appropriately.

== Interfaces ==
With these proposed changes, load and store functions in Pig are becoming very 
weighty objects.  The current !LoadFunc interface already
provides mechanisms for reading the data, getting some schema information, 
casting data, and some place holders for pushing down projections into
the loader.  This proposal will add more file level metadata, global metadata, 
selection push down, plus interaction with !InputFormats.  It will
also add !OutputFormats to store functions.  If we create two monster 
interfaces that attempt to provide everything, the burden of creating a
new load or store function in Pig will become overwhelming.  Instead, this 
proposal envisions splitting the interface into a number of
interfaces, each with a clear responsibility.  Load and store functions will 
then only be required to implement the interfaces for functionality they offer.

For load functions:
 * !LoadFunc will be pared down to just contain functions directly associated 
with reading data, such as getNext.
 * A new !LoadCaster interface will be added.  This interface will contain all 
of the bytesToX methods currently in !LoadFunc.  !LoadFunc will add a getCaster 
routine, that will return an object that can provide casts.  The existing 
UTF8!StorageConverter class will change to implement this interface.  Load 
functions will then be free to use this class as their caster, or provide their 
own.  For existing load functions that provide all of the bytesToX methods, 
they can implement the !LoadCaster interface and return themselves from the 
getCaster routine.  If a loader does not provide a !LoadCaster, casts from byte 
array to other pig types will not be supported for data loaded via that loader.
 * A new !LoadMetadata interface will be added.  Calls that find metadata about 
the data being loaded, such as determineSchema, will be placed in this 
interface.  If a loader does not implement this interface, then Pig will assume 
that no metadata is obtainable for this data.
 * A new !LoadPushDown interface will be added.  Calls that determine what can 
be pushed down and pushing that functionality down into the loader will be 
placed in this interface.  If a loader does not implement this interface, then 
Pig will assume that the loader is not capable of pushing down any 
functionality.

For store functions:
 * A new method getOutputFormat will be added to !StoreFunc to allow a storage 
function to return its output format.
 * A new interface !StoreMetadata will be added to provide a way for storage 
functions to record metadata.  If a given storage function does not implement 
this interface Pig will assume that it is unable to record metadata.

=== Details ===

!LoadFunc:

{{{
/**
 * This interface is used to implement functions to parse records
 * from a dataset.
 */
public interface LoadFunc {
    /**
     * Retrieves the next tuple to be processed.
     * @return the next tuple to be processed or null if there are no more 
tuples
     * to be processed.
     * @throws IOException
     */
    Tuple getNext() throws IOException;

    /**
     * Communicate to the loader the URI used in Pig Latin to refer to the 
     * object being loaded.  That is, if the PL script is
     * <b>A = load 'bla'</b>
     * then 'bla' is the URI.  Load functions should assume that if no
     * scheme is provided in the URI it is an hdfs file.
     * @param uri URI referenced in load statement
     */
    void setURI(URI uri);
    
    /**
     * Return the InputFormat associated with this loader.
     */
    InputFormat getInputFormat();

    /**
     * Return the LoadCaster associated with this loader.  Returning
     * null indicates that casts from byte array are not supported
     * for this loader.
     */
    LoadCaster getLoadCaster();
}

}}}

The !LoadCaster interface will include bytesToInt, bytesToLong, etc. functions
currently in !LoadFunc.  UTF8!StorageConverter will implement this interface.

!LoadMetadata:
{{{

/**
 * This interface defines how to retrieve metadata related to data to be loaded.
 * If a given loader does not implement this interface, it will be assumed that 
it
 * is unable to provide metadata about the associated data.
 */
interface LoadMetadata {

    /**
     * Get a schema for the data to be loaded.  This schema should represent
     * all tuples of the returned data.  If the schema is unknown or it is
     * not possible to return a schema that represents all returned data,
     * then null should be returned.
     */
    LoadSchema getSchema();

    /**
     * Get statistics about the data to be loaded.
     */
    LoadStatistics getStatistics();

}

}}}

!LoadSchema will be a top level object (`org.apache.pig.LoadSchema`) used to 
communicate information about data to be loaded or that is being
stored.  It is not the same as the existing 
`org.apache.pig.impl.logicalLayer.schema.Schema`.

{{{
    public class LoadSchema {

        public class FileFieldSchema {
            public String name;
            public DataType type;
            public String description;
        }

        public ColumnSchema[] fields;
        public Map<String, Integer> byName;

        enum Order { ASCENDING, DESCENDING }
        public int[] sortKeys; // each entry is an offset into the fields array.
        public Order[] sortKeyOrders; 
        public int[] partitionKeys; // each entry is an offset into the fields 
array.
    }
}}}

!LoadStatistics:

{{{
    public class LoadStatistics {

        public class LoadFieldStatistics {

            enum Distribution {UNIFORM, NORMAL, POWER};

            public long numDistinctValues;  // number of distinct values 
represented in this field
            public Distribution distribution; // how values in this field are 
distributed

            // We need some way to represent a histogram of values in the field,
            // as those will be useful.  However, we can't count on being
            // able to hold such histograms in memory.  Have to figure out
            // how they can be kept on disk and represented here.

            // Probably more in here
        }

        public long mBytes; // size in megabytes
        public long numRecords;  // number of records

        // Probably more in here
    }
}}}

An open question for !LoadPushdown is how do we communicate what needs to be 
pushed down?  We could do it via segments of a !LogicalPlan or some
type of tree.  This isn't very convenient for Map-Reduce, which will also want 
to communicate push downs to !InputFormats.  We could do it via
SQL like string, but then loaders have to implement a parser.

!LoadPushdown:
{{{

/**
 * This interface defines how to communicate to Pig what functionality can
 * be pushed into the loader.  If a given loader does not implement this 
interface
 * it will be assumed that it is unable to accept any functionality for push 
down.
 */
interface LoadPushDown {

    /**
     * Set of possible operations that Pig can push down to a loader. 
     */
    enum OperatorSet {PROJECTION, SELECTION};

    /**
     * Determine the operators that can be pushed to the loader.  
     * Note that by indicating a loader can accept a certain operator
     * (such as selection) the loader is not promising that it can handle
     * all selections.  When it is passed the actual operators to 
     * push down it will still have a chance to reject them.
     * @return list of all features that the loader can support
     */
    List<OperatorSet> getFeatures();

    /**
     * Propose a set of operators to push to the loader.
     * @param plan LogicalPlan containing proposed operators
     * @return true if the loader accepts the plan, false if not.
     * If false is returned Pig may choose to trim the plan and call
     * this method again.
     */
    boolean pushOperators(LogicalPlan plan);
}

}}}

!StoreFunc:

{{{
public interface StoreFunc {

    /**
     * Return the OutputFormat associated with StoreFunc.
     */
    OutputFormat getOutputFormat();

    /**
     * Check that a given schema is acceptable to this storage function.
     * This can be used to check that the correct partition keys are included.
     * For a storage function to be written directly to an OutputFormat it can
     * be used to make sure the schema will translate in a well defined way.
     * @param schema to be checked
     * @throw IOException if this schema is not acceptable.  It should include
     * a detailed error message indicating what is wrong with the schema.
     */
    void checkSchema(LoadSchema s) throws IOException;

    /**
     * Write a tuple the output stream to which this instance was
     * previously bound.
     * 
     * @param f the tuple to store.
     * @throws IOException
     */
    void putNext(Tuple f) throws IOException;

}

}}}

!StoreMetadata

{{{
 /**
 * This interface defines how to write metadata related to data to be loaded.
 * If a given store function does not implement this interface, it will be 
assumed that it
 * is unable to record metadata about the associated data.
 */
interface StoreMetadata {

    /**
     * Set a schema for the data being written.
     */
    void setSchema(LoadSchema s);

    /**
     * Set statistics about the data being written.
     */
    void setStatistics(LoadStatistics stats);

}

}}}

   


== LoadFunc and InputFormat Interaction ==

The Slicer and Slice interfaces duplicate much of the !InputFormat and 
!InputSplit interfaces provided by Hadoop.  The current !SliceWrapper
helper function interacts with these slices to encode information about input 
sources into each !InputSplit so that Pig can correctly manage
which processing pipeline a tuple from a given input is assigned to.  Currently 
load functions also duplicate much of the functionality of
Hadoop's !RecordReader interface.  Currently load functions are provided a data 
stream and asked to parse both records and fields within records.
!RecordReader already provides the functionality to parse out records, though 
it always returns two values, key and value, not a tuple of any
number of values.  The !LoadFunc interface cannot be removed because it still 
needs to parse out fields within a record.  But an integral part of
this proposal is to change the !LoadFunc to call !RecordReader.getKey and 
getValue and then parse out fields in the tuple from that
result.

Since Pig still needs to add information to !InputSplits, user provided 
!InputFormats and !InputSplits cannot be used directly.  Instead, the
proposal is to change !PigInputFormat to contain an !InputFormat.  
!PigInputFormat will return !PigInputSplits, each of which contain an
!InputSplit.  In addition, !PigInputSplit will contain the necessary 
information to allow Pig to correctly address tuples to the correct data
processing pipeline.

In order to support arbitrary Hadoop !InputFormats, it will be necessary to 
construct a load function, !InputFormatLoader, that will take an
!InputFormat as a constructor argument.  When asked by Pig which !InputFormat 
to use, it will return the one indicated by the user.  Its call to
getNext will then take the key and value provided by the associated 
!RecordReader and construct a two field tuple.  These types will be converted
to Pig types as follows:

|| Writable         || Pig type   || Comment                                    
                                   ||
|| Text             || chararray  ||                                            
                                   ||
|| !IntWritable     || integer    ||                                            
                                   ||
|| V!IntWritable    || integer    ||                                            
                                   ||
|| !LongWritable    || long       ||                                            
                                   ||
|| V!LongWritable   || long       ||                                            
                                   ||
|| !FloatWritable   || float      ||                                            
                                   ||
|| !DoubleWritable  || double     ||                                            
                                   ||
|| !BooleanWritable || int        || In the future if Pig exposes boolean as a 
first class type, this would change ||
|| !ByteWritable    || int        ||                                            
                                   ||
|| !NullWritable    || null       ||                                            
                                   ||
|| All others       || byte array ||                                            
                                   ||

Since the format of any other types are unknown to Pig and cannot be 
generalized, it does not make sense to provide casts from byte array to pig
types via a !LoadCaster.  If users wish to use an !InputFormat that uses types 
beyond these and cast them to Pig types, they can extend the
!InputFormatLoader and return a custom written !LoadCaster that will handle 
casting their data types.

=== Details ===
Currently, in addition to information regularly passed with an !InputSplit, Pig 
also passes along additional information via the !SliceWrapper
(Pig's implementation of !InputSplit).  This includes an index (it's not clear 
what this is for, and there are no calls to
!SliceWrapper.getIndex() in the code; boy comments in code sure are nice), and 
exec type to indicate MR or local mode, and the list of target
operators (the pipeline) for this input.  These items can continue to be 
carried along as is in the proposed !PigInputSplit.

Pig's Slice interface currently supports the following calls:
 * getLocations - will be replaced by !InputSplit.getLocations
 * init - replaced by !RecordReader.initialize
 * getStart - see below
 * getLength - replaced by !InputSplit.getLength
 * close - replaced by !RecordReader.close
 * getPos - see below
 * getProgress - replaced by !RecordReader.getProgress
 * next - replaced by !RecordReader.getCurrentKey and getCurrentValue

Positioning information in an !InputSplit presents a problem.  Hadoop 0.18 has 
a getPos call in the !InputSplit, but it has been removed in 0.20.
The reason is that input from files can generally be assigned a position, 
though it may not always be
accurate, as in the bzip case.  But some input formats position may not have 
meaning.  Even if Pig does not switch to using !InputFormats it will
have to deal with this issue, just as MR has.

However, in some places Pig needs this position information.  In particular, 
when building an index for a merge join, Pig needs a way to mark a
location in an input while building the index and then return to that position 
during the join.  This issue will have to be pursued with the MR
team to see if there is a way to provide this functionality for input types 
where it makes sense.

These changes will affect the !SamplableLoader interface.  Currently it uses 
skip and getPos to move the underlying stream so that it can pick
up a sample of tuples out of a block.  Since it would sit atop !InputFormat it 
would no longer have access to the underlying stream.  It could be
changed instead to skip a number of tuples.  Rather than skipping a uniform 
amount inside the block it could skip a random number of tuples each
time.  This will result in a better sample, but also risks running out of data 
before obtaining the desired number of tuples.

These changes will also affect loaders that need to read a record in order to 
determine their schema, such as !BinStorage or a JSON loader.
These loaders will need to know how to read at least the first record on their 
own, without the benefit of the underlying !InputFormat, since
they will need to call this on the front end where an !InputFormat has not yet 
been used.

Open question:  for cases where we want to open the file as a side file, rather 
than as the main MR file, is it still possible to use an IF to
read it?  Pig needs to read these side files and still parse them into tuples.  
It will have to be able to invoke the IF/IS for this file.

'''Performance concerns.'''

With this proposal we are strongly encouraging users to double parse their data 
(since they will parse out the records and then parse those into
fields).  This could carry a performance penalty.  Also, we are removing the 
ability to seek when sampling, which could also contain a
performance penalty.  A few thoughts on this:
 1. We are already in the process of moving !PigStorage to use 
!LineRecordReader underneath, and we are getting a 30% speed up for doing it.  
This is only one case, but at least in this case double parsing is not hurting 
us.
 1. If users that are writing both an !InputFormat and a load function are 
concerned about double parsing, there is nothing stopping them from having 
their !InputFormat track where field boundaries are and pass that information 
along to their loader so that they can parse in a single pass.
 1. For sampling, it is not clear how sever the penalty is for removing seek, 
since Hadoop is reading much of the information off disk into the buffers 
anyway, and sequential read on disk is not that much worse than seek.

All this said, before committing to this we should do some prototyping and 
performance testing to convince ourselves that these changes will be
performance neutral.

== StoreFunc and OutputFormat Interaction ==
In the same way that !LoadFunc currently duplicates some functionality of 
!InputFormat, !StoreFunc duplicates some functionality of !OutputFormat.  
!StoreFunc will be changed to deal primarily with converting a tuple to a key 
value pair that can be stored by Hadoop.

To support arbitrary !OutputFormats, a new storage function 
!OutputFormatStorage will be written that will take an !OutputFormat as a 
constructor
argument.  Tuples to be stored by this storage function must have either one or 
two fields.   If they have two fields, the first of will be taken
to be the key, and the second the value.  If they have one, the key will be set 
to null and the value will be taken from the single field.

Reply via email to