Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "LoadStoreRedesignProposal" page has been changed by PradeepKamath. http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=41&rev2=42 -------------------------------------------------- public interface LoadMetadata { /** - * Get a schema for the data to be loaded. + * Get a schema for the data to be loaded. - * @param location Location as returned by + * @param location Location as returned by - * {...@link LoadFunc#relativeToAbsolutePath(String, String)} + * {...@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)} - * @param conf The {...@link Configuration} object + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. * @return schema for the data to be loaded. This schema should represent * all tuples of the returned data. If the schema is unknown or it is * not possible to return a schema that represents all returned data, * then null should be returned. * @throws IOException if an exception occurs while determining the schema */ - ResourceSchema getSchema(String location, Configuration conf) throws + ResourceSchema getSchema(String location, Job job) throws IOException; /** * Get statistics about the data to be loaded. If no statistics are * available, then null should be returned. - * @param location Location as returned by + * @param location Location as returned by - * {...@link LoadFunc#relativeToAbsolutePath(String, String)} + * {...@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)} - * @param conf The {...@link Configuration} object + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. * @return statistics about the data to be loaded. If no statistics are * available, then null should be returned. * @throws IOException if an exception occurs while retrieving statistics */ - ResourceStatistics getStatistics(String location, Configuration conf) + ResourceStatistics getStatistics(String location, Job job) throws IOException; /** * Find what columns are partition keys for this input. - * @param location Location as returned by + * @param location Location as returned by - * {...@link LoadFunc#relativeToAbsolutePath(String, String)} + * {...@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)} - * @param conf The {...@link Configuration} object + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. - * @return array of field names of the partition keys. + * @return array of field names of the partition keys. Implementations + * should return null to indicate that there are no partition keys * @throws IOException if an exception occurs while retrieving partition keys */ - String[] getPartitionKeys(String location, Configuration conf) + String[] getPartitionKeys(String location, Job job) throws IOException; /** * Set the filter for partitioning. It is assumed that this filter * will only contain references to fields given as partition keys in - * getPartitionKeys + * getPartitionKeys. So if the implementation returns null in + * {...@link #getPartitionKeys(String, Job)}, then this method is not + * called by pig runtime. This method is also not called by the pig runtime + * if there are no partition filter conditions. - * @param plan that describes filter for partitioning + * @param partitionFilter that describes filter for partitioning * @throws IOException if the filter is not compatible with the storage * mechanism or contains non-partition fields. */ - void setParitionFilter(OperatorPlan plan) throws IOException; + void setPartitionFilter(Expression partitionFilter) throws IOException; } }}} @@ -448, +458 @@ {{{ /** - * This interface is used to implement functions to write records + * This abstract class is used to implement functions to write records * from a dataset. + * + * */ - public interface StoreFunc { + public abstract class StoreFunc implements StoreFuncInterface { /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The - * StoreFunc implementation is free to choose how it converts a relative + * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location - * string represent (hdfs path or some other data source) + * string represent (hdfs path or some other data source). + * - * + * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements - * in the script, this would be the home directory - + * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed + * @throws IOException * @throws IOException if the conversion is not possible */ + @Override - String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException; + public String relToAbsPathForStoreLocation(String location, Path curDir) + throws IOException { + return LoadFunc.getAbsolutePath(location, curDir); + } /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and not on the backend during - * execution. + * execution. * @return the {...@link OutputFormat} associated with StoreFunc - * @throws IOException if an exception occurs while constructing the + * @throws IOException if an exception occurs while constructing the * OutputFormat * */ - OutputFormat getOutputFormat() throws IOException; + public abstract OutputFormat getOutputFormat() throws IOException; /** - * Communicate to the store function the location used in Pig Latin to refer - * to the object(s) being stored. That is, if the PL script is - * <b>store A into 'bla'</b> - * then 'bla' is the location. This location should be either a file name - * or a URI. If it does not have a URI scheme Pig will assume it is a - * filename. This will be - * called during planning on the front end, not during execution on - * the backend. - * @param location Location indicated in store statement. + * Communicate to the storer the location where the data needs to be stored. + * The location string passed to the {...@link StoreFunc} here is the + * return value of {...@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} + * This method will be called in the frontend and backend multiple times. Implementations + * should bear in mind that this method is called multiple times and should + * ensure there are no inconsistent side effects due to the multiple calls. + * {...@link #checkSchema(ResourceSchema)} will be called before any call to + * {...@link #setStoreLocation(String, Job)}. + * + + * @param location Location returned by + * {...@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {...@link Job} object * @throws IOException if the location is not valid. */ - void setStoreLocation(String location, Job job) throws IOException; + public abstract void setStoreLocation(String location, Job job) throws IOException; - + /** * Set the schema for data to be stored. This will be called on the + * front end during planning if the store is associated with a schema. - * front end during planning. A Store function should implement this function to + * A Store function should implement this function to * check that a given schema is acceptable to it. For example, it * can check that the correct partition keys are included; * a storage function to be written directly to an OutputFormat can - * make sure the schema will translate in a well defined way. + * make sure the schema will translate in a well defined way. * @param s to be checked * @throws IOException if this schema is not acceptable. It should include * a detailed error message indicating what is wrong with the schema. */ + @Override - void checkSchema(ResourceSchema s) throws IOException; + public void checkSchema(ResourceSchema s) throws IOException { + // default implementation is a no-op + } /** * Initialize StoreFunc to write data. This will be called during @@ -516, +541 @@ * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ - void prepareToWrite(RecordWriter writer) throws IOException; + public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple the output stream to which this instance was * previously bound. - * + * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ - void putNext(Tuple t) throws IOException; + public abstract void putNext(Tuple t) throws IOException; - + + /** + * This method will be called by Pig both in the front end and back end to + * pass a unique signature to the {...@link StoreFunc} which it can use to store + * information in the {...@link UDFContext} which it needs to store between + * various method invocations in the front end and back end. This method + * will be called before other methods in {...@link StoreFunc}. + * @param signature a unique signature to identify this StoreFunc + */ + @Override + public void setStoreFuncUDFContextSignature(String signature) { + // default implementation is a no-op + } + + /** + * This method will be called by Pig if the job which contains this store + * fails. Implementations can clean up output locations in this method to + * ensure that no incorrect/incomplete results are left in the output location. + * The implementation in {...@link StoreFunc} deletes the output location if it + * is a {...@link FileSystem} location. + * @param location Location returned by + * {...@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. + */ + @Override + public void cleanupOnFailure(String location, Job job) + throws IOException { + cleanupOnFailureImpl(location, job); + } + + /** + * Implementation for {...@link #cleanupOnFailure(String, Job)} + * @param location + * @param job + * @throws IOException + */ + public static void cleanupOnFailureImpl(String location, Job job) + throws IOException { + FileSystem fs = FileSystem.get(job.getConfiguration()); + Path path = new Path(location); + if(fs.exists(path)){ + fs.delete(path, true); + } + } } }}} '''!StoreMetadata''' {{{ - /** + /** * This interface defines how to write metadata related to data to be loaded. * If a given store function does not implement this interface, it will be assumed that it * is unable to record metadata about the associated data. */ + - interface StoreMetadata { + public interface StoreMetadata { /** - * Set statistics about the data being written. + * Store statistics about the data being written. + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. - * @throws IOException + * @throws IOException */ - void setStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException; + void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException; /** - * Set schema of the data being written + * Store schema of the data being written + * @param job The {...@link Job} object - this should be used only to obtain + * cluster properties through {...@link Job#getConfiguration()} and not to set/query + * any runtime job information. - * @throws IOException + * @throws IOException */ - void setSchema(ResourceSchema schema, String location, Configuration conf) throws IOException; + void storeSchema(ResourceSchema schema, String location, Job job) throws IOException; - } }}} Given the uncertainly noted above under !ResourceStatistics on how statistics should be stored, it is not clear that this interface makes sense.
