Hi,

 

This is the rough source codes of the slicer/loadfunc:

 

public class HadoopStoreStorage extends Utf8StorageConverter
    implements LoadFunc, Slicer {

    private static final Log LOG = LogFactory.getLog(HadoopStoreStorage.class);
    
    private transient HSSchema tableSchema = null;
    private transient HSTupleFactory tupleFactory = null;
    private int partitionIndex = 0;
    
    //
    // Table directory in HDFS
    //
    private String location;
            
    public Tuple getNext() throws IOException {
         if(tupleFactory == null)
            return null;

        try {
            Tuple tuple = tupleFactory.getNextTuple();
            return tuple;
        } catch(Exception exp) {
            System.out.println(exp);
            exp.printStackTrace();
            throw new IOException("next failure.");
        }
    }

    public void bindTo(String fileName, BufferedPositionedInputStream in, long 
offset, long end) throws IOException {
        LOG.info("Enter bindTo");
        System.out.println("Enter bindTo");

        location = fileName;
        partitionIndex = (int) offset;

        if(this.tableSchema == null)
            this.tableSchema = new HSSchema(location);
        this.tupleFactory = new HSTupleFactory(tableSchema, partitionIndex);

        LOG.info("Leave bindTo");
    }

    public void finish() throws IOException {
        LOG.info("Enter finish");
        tupleFactory.close();
    }

    public Schema determineSchema(String fileName, ExecType execType, 
DataStorage storage) throws IOException {
        LOG.info("Enter determineSchema");
        
        //
        // This method should be implemented in HadoopStore as at compile time, 
the meta info of a table is already available
        //

        //
        // Make sure that table schema is available
        //
        if(tableSchema == null) {
            tableSchema = new HSSchema(fileName);
        }

        try {
            return tableSchema.toPigSchema();
        } catch(FrontendException e) {
            System.out.println(e.toString());
            throw new IOException("FrontendException encountered.");
        }
    }

    public void fieldsToRead(Schema schema) {
        LOG.info("Enter fieldsToRead");

        try {
            tupleFactory.setFieldsToRead(schema);
        } catch(FrontendException e) {
            System.out.println(e.toString());
        }
    }


    //
    // Slicer methods
    //

    //
    // Create a list of partitions
    //
    public Slice[] slice(DataStorage store, String location) throws IOException 
{
        LOG.info("Enter slice");
        
      //
      // Validate the location
      //
        validate(store, location);

        this.location = location;
        //
        // Load the schema from the location
        //
        if(tableSchema == null)
            tableSchema = new HSSchema(location);
        
        List<HadoopStoreSlice> newSlices = new ArrayList<HadoopStoreSlice>();

        //
        // Retrieve the number of table horizontal partitions
        //
        int NumberOfPartitions = tableSchema.getPartitionNumber();

        for(int i = 0; i < NumberOfPartitions; i ++ ) {
            newSlices.add(new HadoopStoreSlice(location, i));
        }

       Slice[] slices = new Slice [newSlices.size()];
       for(int i = 0; i < newSlices.size(); i ++ ) {
            slices[i] = (Slice) newSlices.get(i);
       }

        LOG.info("Leave slice");
 return slices;
    }

    public void validate(DataStorage store, String location) throws IOException 
{
        LOG.info("Enter validate");
        
        if (!FileLocalizer.isDirectory(location, store)
         || !FileLocalizer.fileExists(location + 
GlobalParameter.TABLE_SCHEMA_RELATIVE_PATH, store)) {
            int errCode = 2100;
            String msg = location + " is not a valid HadoopStore table 
directory.";
            throw new ExecException(msg, errCode, PigException.BUG);
        }

        LOG.info("Leave validate");
    }

 }

 

HSSchema and HSTupleFactory build up a storage based on HDFS. Please let me 
know if you see any problem.

 

I really appreciate your help!

 

Thanks,

Richard

 
> From: dvrya...@gmail.com
> Subject: RE: Custom Loadfunc problem!
> Date: Mon, 26 Oct 2009 10:33:21 -0400
> To: pig-dev@hadoop.apache.org; pig-dev@hadoop.apache.org
> 
> Jeff,
> Slicers dont work in local mode, there is an ancient ticket for that on the 
> Jira.
> 
> Richard -- hard to say whats going on without more code. Think you can come 
> up with a simplified version of your loadfunc that fails in a similar manner, 
> and share it?
> 
> 
> 
> -----Original Message-----
> From: "zjffdu" <zjf...@gmail.com>
> To: pig-dev@hadoop.apache.org
> Sent: 10/27/2009 1:45 AM
> Subject: RE: Custom Loadfunc problem!
> 
> Illustrate will not execute the job, while dump and store will execute it.
> So I think there must be something wrong with your custom slicer. I suggest
> you set breakpoint in your slicer and debug it in map reduce mode locally
> 
> 
> 
> -----Original Message-----
> From: RichardGUO Fei [mailto:gladiato...@hotmail.com] 
> Sent: 20091026 0:43
> To: pig-dev@hadoop.apache.org
> Subject: RE: Custom Loadfunc problem!
> 
> 
> Hi,
> 
> 
> 
> Btw, my program works with ILLUSTRATE but n
> 
> [truncated by sender]
                                          
_________________________________________________________________
全新 Windows 7:寻找最适合您的 PC。了解详情。
http://www.microsoft.com/china/windows/buy/ 

Reply via email to