Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreRedesignProposal" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=41&rev2=42

--------------------------------------------------

  public interface LoadMetadata {
  
      /**
-      * Get a schema for the data to be loaded.
+      * Get a schema for the data to be loaded.  
-      * @param location Location as returned by
+      * @param location Location as returned by 
-      * {...@link LoadFunc#relativeToAbsolutePath(String, String)}
+      * {...@link LoadFunc#relativeToAbsolutePath(String, 
org.apache.hadoop.fs.Path)}
-      * @param conf The {...@link Configuration} object
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information.  
       * @return schema for the data to be loaded. This schema should represent
       * all tuples of the returned data.  If the schema is unknown or it is
       * not possible to return a schema that represents all returned data,
       * then null should be returned.
       * @throws IOException if an exception occurs while determining the schema
       */
-     ResourceSchema getSchema(String location, Configuration conf) throws
+     ResourceSchema getSchema(String location, Job job) throws 
      IOException;
  
      /**
       * Get statistics about the data to be loaded.  If no statistics are
       * available, then null should be returned.
-      * @param location Location as returned by
+      * @param location Location as returned by 
-      * {...@link LoadFunc#relativeToAbsolutePath(String, String)}
+      * {...@link LoadFunc#relativeToAbsolutePath(String, 
org.apache.hadoop.fs.Path)}
-      * @param conf The {...@link Configuration} object
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information.  
       * @return statistics about the data to be loaded.  If no statistics are
       * available, then null should be returned.
       * @throws IOException if an exception occurs while retrieving statistics
       */
-     ResourceStatistics getStatistics(String location, Configuration conf)
+     ResourceStatistics getStatistics(String location, Job job) 
      throws IOException;
  
      /**
       * Find what columns are partition keys for this input.
-      * @param location Location as returned by
+      * @param location Location as returned by 
-      * {...@link LoadFunc#relativeToAbsolutePath(String, String)}
+      * {...@link LoadFunc#relativeToAbsolutePath(String, 
org.apache.hadoop.fs.Path)}
-      * @param conf The {...@link Configuration} object
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information.  
-      * @return array of field names of the partition keys.
+      * @return array of field names of the partition keys. Implementations 
+      * should return null to indicate that there are no partition keys
       * @throws IOException if an exception occurs while retrieving partition 
keys
       */
-     String[] getPartitionKeys(String location, Configuration conf)
+     String[] getPartitionKeys(String location, Job job) 
      throws IOException;
  
      /**
       * Set the filter for partitioning.  It is assumed that this filter
       * will only contain references to fields given as partition keys in
-      * getPartitionKeys
+      * getPartitionKeys. So if the implementation returns null in 
+      * {...@link #getPartitionKeys(String, Job)}, then this method is not
+      * called by pig runtime. This method is also not called by the pig 
runtime
+      * if there are no partition filter conditions. 
-      * @param plan that describes filter for partitioning
+      * @param partitionFilter that describes filter for partitioning
       * @throws IOException if the filter is not compatible with the storage
       * mechanism or contains non-partition fields.
       */
-     void setParitionFilter(OperatorPlan plan) throws IOException;
+     void setPartitionFilter(Expression partitionFilter) throws IOException;
  
  }
  }}}
@@ -448, +458 @@

  
  {{{
  /**
- * This interface is used to implement functions to write records
+ * This abstract class is used to implement functions to write records
  * from a dataset.
+ * 
+ *
  */
  
- public interface StoreFunc {
+ public abstract class StoreFunc implements StoreFuncInterface {
  
      /**
       * This method is called by the Pig runtime in the front end to convert 
the
       * output location to an absolute path if the location is relative. The
-      * StoreFunc implementation is free to choose how it converts a relative
+      * StoreFunc implementation is free to choose how it converts a relative 
       * location to an absolute location since this may depend on what the 
location
-      * string represent (hdfs path or some other data source)
+      * string represent (hdfs path or some other data source). 
+      *  
-      *
+      * 
       * @param location location as provided in the "store" statement of the 
script
       * @param curDir the current working direction based on any "cd" 
statements
       * in the script before the "store" statement. If there are no "cd" 
statements
-      * in the script, this would be the home directory -
+      * in the script, this would be the home directory - 
       * <pre>/user/<username> </pre>
       * @return the absolute location based on the arguments passed
+      * @throws IOException 
       * @throws IOException if the conversion is not possible
       */
+     @Override
-     String relToAbsPathForStoreLocation(String location, Path curDir) throws 
IOException;
+     public String relToAbsPathForStoreLocation(String location, Path curDir) 
+     throws IOException {
+         return LoadFunc.getAbsolutePath(location, curDir);
+     }
  
      /**
       * Return the OutputFormat associated with StoreFunc.  This will be called
       * on the front end during planning and not on the backend during
-      * execution.
+      * execution. 
       * @return the {...@link OutputFormat} associated with StoreFunc
-      * @throws IOException if an exception occurs while constructing the
+      * @throws IOException if an exception occurs while constructing the 
       * OutputFormat
       *
       */
-     OutputFormat getOutputFormat() throws IOException;
+     public abstract OutputFormat getOutputFormat() throws IOException;
  
      /**
-      * Communicate to the store function the location used in Pig Latin to 
refer
-      * to the object(s) being stored.  That is, if the PL script is
-      * <b>store A into 'bla'</b>
-      * then 'bla' is the location.  This location should be either a file name
-      * or a URI.  If it does not have a URI scheme Pig will assume it is a
-      * filename.  This will be
-      * called during planning on the front end, not during execution on
-      * the backend.
-      * @param location Location indicated in store statement.
+      * Communicate to the storer the location where the data needs to be 
stored.  
+      * The location string passed to the {...@link StoreFunc} here is the 
+      * return value of {...@link 
StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
+      * This method will be called in the frontend and backend multiple times. 
Implementations
+      * should bear in mind that this method is called multiple times and 
should
+      * ensure there are no inconsistent side effects due to the multiple 
calls.
+      * {...@link #checkSchema(ResourceSchema)} will be called before any call 
to
+      * {...@link #setStoreLocation(String, Job)}.
+      * 
+ 
+      * @param location Location returned by 
+      * {...@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
       * @param job The {...@link Job} object
       * @throws IOException if the location is not valid.
       */
-     void setStoreLocation(String location, Job job) throws IOException;
+     public abstract void setStoreLocation(String location, Job job) throws 
IOException;
- 
+  
      /**
       * Set the schema for data to be stored.  This will be called on the
+      * front end during planning if the store is associated with a schema.
-      * front end during planning. A Store function should implement this 
function to
+      * A Store function should implement this function to
       * check that a given schema is acceptable to it.  For example, it
       * can check that the correct partition keys are included;
       * a storage function to be written directly to an OutputFormat can
-      * make sure the schema will translate in a well defined way.
+      * make sure the schema will translate in a well defined way.  
       * @param s to be checked
       * @throws IOException if this schema is not acceptable.  It should 
include
       * a detailed error message indicating what is wrong with the schema.
       */
+     @Override
-     void checkSchema(ResourceSchema s) throws IOException;
+     public void checkSchema(ResourceSchema s) throws IOException {
+         // default implementation is a no-op
+     }
  
      /**
       * Initialize StoreFunc to write data.  This will be called during
@@ -516, +541 @@

       * @param writer RecordWriter to use.
       * @throws IOException if an exception occurs during initialization
       */
-     void prepareToWrite(RecordWriter writer) throws IOException;
+     public abstract void prepareToWrite(RecordWriter writer) throws 
IOException;
  
      /**
       * Write a tuple the output stream to which this instance was
       * previously bound.
-      *
+      * 
       * @param t the tuple to store.
       * @throws IOException if an exception occurs during the write
       */
-     void putNext(Tuple t) throws IOException;
+     public abstract void putNext(Tuple t) throws IOException;
- 
+     
+     /**
+      * This method will be called by Pig both in the front end and back end to
+      * pass a unique signature to the {...@link StoreFunc} which it can use 
to store
+      * information in the {...@link UDFContext} which it needs to store 
between
+      * various method invocations in the front end and back end. This method 
+      * will be called before other methods in {...@link StoreFunc}.
+      * @param signature a unique signature to identify this StoreFunc
+      */
+     @Override
+     public void setStoreFuncUDFContextSignature(String signature) {
+         // default implementation is a no-op
+     }
+     
+     /**
+      * This method will be called by Pig if the job which contains this store
+      * fails. Implementations can clean up output locations in this method to
+      * ensure that no incorrect/incomplete results are left in the output 
location.
+      * The implementation in {...@link StoreFunc} deletes the output location 
if it
+      * is a {...@link FileSystem} location.
+      * @param location Location returned by 
+      * {...@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information. 
+      */
+     @Override
+     public void cleanupOnFailure(String location, Job job) 
+     throws IOException {
+         cleanupOnFailureImpl(location, job);
+     }
+     
+     /**
+      * Implementation for {...@link #cleanupOnFailure(String, Job)}
+      * @param location
+      * @param job
+      * @throws IOException
+      */
+     public static void cleanupOnFailureImpl(String location, Job job) 
+     throws IOException {
+         FileSystem fs = FileSystem.get(job.getConfiguration());
+         Path path = new Path(location);
+         if(fs.exists(path)){
+             fs.delete(path, true);
+         }    
+     }
  }
  }}}
  '''!StoreMetadata'''
  
  {{{
-  /**
+ /**
   * This interface defines how to write metadata related to data to be loaded.
   * If a given store function does not implement this interface, it will be 
assumed that it
   * is unable to record metadata about the associated data.
   */
+ 
- interface StoreMetadata {
+ public interface StoreMetadata {
  
      /**
-      * Set statistics about the data being written.
+      * Store statistics about the data being written.
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information.  
-      * @throws IOException
+      * @throws IOException 
       */
-     void setStatistics(ResourceStatistics stats, String location, 
Configuration conf) throws IOException;
+     void storeStatistics(ResourceStatistics stats, String location, Job job) 
throws IOException;
  
      /**
-      * Set schema of the data being written
+      * Store schema of the data being written
+      * @param job The {...@link Job} object - this should be used only to 
obtain 
+      * cluster properties through {...@link Job#getConfiguration()} and not 
to set/query
+      * any runtime job information.  
-      * @throws IOException
+      * @throws IOException 
       */
-     void setSchema(ResourceSchema schema, String location, Configuration 
conf) throws IOException;
+     void storeSchema(ResourceSchema schema, String location, Job job) throws 
IOException;
- 
  }
  }}}
  Given the uncertainly noted above under !ResourceStatistics on how statistics 
should be stored, it is not clear that this interface makes sense.

Reply via email to