Hi,

 

All the logs from HadoopStoreStorage are displayed. So I can see that 
determineSchema works (this can also be verified by using EXPLAIN A;). Also, 
slice method exits normally. However, none of the logs from the Slice is shown 
(even in hadoop local mode, if I use System.out.println to output to stdandard 
output, no userlog has been recorded). I suspect that Slice methods have not 
been entered at all. Below is the code for Slice:

 

public class HadoopStoreSlice implements Slice {

    private static final long serialVersionUID = 9035916017187148968L;
    
    private transient static final Log LOG = 
LogFactory.getLog(HadoopStoreSlice.class);

    private transient HadoopStoreStorage storeStorage = null;
    private int partitionIndex = 0;
    private String path;

    public HadoopStoreSlice(String path, int partitionIndex) {
        this.path = path;
        this.partitionIndex = partitionIndex;
    }

    @Override
    public void init(DataStorage store) throws IOException {
        LOG.info("Enter init");
        storeStorage = new HadoopStoreStorage();
        storeStorage.bindTo(path, null, partitionIndex, 0);
    }

    public long getStart() {
        LOG.info("Enter getStart");
        //
        // Don't know how to compute this value
        //
        return 0;
    }
    
    public long getLength() {
        LOG.info("Enter getLength");
        //
        // Don't know how to compute this value
        //
        return 0;
    }

    public String[] getLocations() {
        LOG.info("Enter getLocations");

        try {
            HSSchema tableSchema = new HSSchema(path);

            Object[] src = 
tableSchema.getPartitionFiles(partitionIndex).toArray();
            LOG.info("Locations:");
            String[] locations = new String[src.length];
            for(int i = 0; i < src.length; i ++ ) {
                locations[i] = (String) src[i];
            }
            LOG.info("Leave getLocations");
            return locations;
        } catch(Exception e) {
            LOG.error(e.toString());
            //
            // If error occurs, report the location as a whole directory
            // (it should not occur)
            //
            LOG.info("Leave getLocations");
            LOG.info("Locations on exception: " + path);
            return new String[] { path };
        }
    }

    public boolean next(Tuple value) throws IOException {
        LOG.info("Enter next");

        if(storeStorage == null) {
            throw new IOException("Did not bind to HadoopStoreStorage");
        }

        Tuple src = storeStorage.getNext();
        value.reference(src);

        LOG.info("Leave next");

        if(src == null)
            return false;
        else
            return true;
    }
    
    public long getPos() throws IOException {
        LOG.info("Enter getPos");
        return 0;
    }

    public void close() throws IOException {
        LOG.info("Enter close");
    }

    public float getProgress() throws IOException {
        LOG.info("Enter getProgress");
    // TODO: let the loader decides the progress
     return 0;
    }

    public void readFields(DataInput is) throws IOException {
        partitionIndex = is.readInt();
        path = is.readUTF();
    }

    public void write(DataOutput os) throws IOException {
        os.writeInt(partitionIndex);
        os.writeUTF(path);
    }
}

 

Thanks,

Richard
 
> Date: Mon, 26 Oct 2009 16:15:53 -0400
> Subject: Re: Custom Loadfunc problem!
> From: dvrya...@gmail.com
> To: pig-dev@hadoop.apache.org
> 
> Do you get any of your Log messages to come out, or none at all?
> 
> -D
> 
> 2009/10/26 RichardGUO Fei <gladiato...@hotmail.com>:
> >
> > Hi,
> >
> >
> >
> > This is the rough source codes of the slicer/loadfunc:
> >
> >
> >
> > public class HadoopStoreStorage extends Utf8StorageConverter
> > implements LoadFunc, Slicer {
> >
> > private static final Log LOG = LogFactory.getLog(HadoopStoreStorage.class);
> >
> > private transient HSSchema tableSchema = null;
> > private transient HSTupleFactory tupleFactory = null;
> > private int partitionIndex = 0;
> >
> > //
> > // Table directory in HDFS
> > //
> > private String location;
> >
> > public Tuple getNext() throws IOException {
> > if(tupleFactory == null)
> > return null;
> >
> > try {
> > Tuple tuple = tupleFactory.getNextTuple();
> > return tuple;
> > } catch(Exception exp) {
> > System.out.println(exp);
> > exp.printStackTrace();
> > throw new IOException("next failure.");
> > }
> > }
> >
> > public void bindTo(String fileName, BufferedPositionedInputStream in, long 
> > offset, long end) throws IOException {
> > LOG.info("Enter bindTo");
> > System.out.println("Enter bindTo");
> >
> > location = fileName;
> > partitionIndex = (int) offset;
> >
> > if(this.tableSchema == null)
> > this.tableSchema = new HSSchema(location);
> > this.tupleFactory = new HSTupleFactory(tableSchema, partitionIndex);
> >
> > LOG.info("Leave bindTo");
> > }
> >
> > public void finish() throws IOException {
> > LOG.info("Enter finish");
> > tupleFactory.close();
> > }
> >
> > public Schema determineSchema(String fileName, ExecType execType, 
> > DataStorage storage) throws IOException {
> > LOG.info("Enter determineSchema");
> >
> > //
> > // This method should be implemented in HadoopStore as at compile time, the 
> > meta info of a table is already available
> > //
> >
> > //
> > // Make sure that table schema is available
> > //
> > if(tableSchema == null) {
> > tableSchema = new HSSchema(fileName);
> > }
> >
> > try {
> > return tableSchema.toPigSchema();
> > } catch(FrontendException e) {
> > System.out.println(e.toString());
> > throw new IOException("FrontendException encountered.");
> > }
> > }
> >
> > public void fieldsToRead(Schema schema) {
> > LOG.info("Enter fieldsToRead");
> >
> > try {
> > tupleFactory.setFieldsToRead(schema);
> > } catch(FrontendException e) {
> > System.out.println(e.toString());
> > }
> > }
> >
> >
> > //
> > // Slicer methods
> > //
> >
> > //
> > // Create a list of partitions
> > //
> > public Slice[] slice(DataStorage store, String location) throws IOException 
> > {
> > LOG.info("Enter slice");
> >
> > //
> > // Validate the location
> > //
> > validate(store, location);
> >
> > this.location = location;
> > //
> > // Load the schema from the location
> > //
> > if(tableSchema == null)
> > tableSchema = new HSSchema(location);
> >
> > List<HadoopStoreSlice> newSlices = new ArrayList<HadoopStoreSlice>();
> >
> > //
> > // Retrieve the number of table horizontal partitions
> > //
> > int NumberOfPartitions = tableSchema.getPartitionNumber();
> >
> > for(int i = 0; i < NumberOfPartitions; i ++ ) {
> > newSlices.add(new HadoopStoreSlice(location, i));
> > }
> >
> > Slice[] slices = new Slice [newSlices.size()];
> > for(int i = 0; i < newSlices.size(); i ++ ) {
> > slices[i] = (Slice) newSlices.get(i);
> > }
> >
> > LOG.info("Leave slice");
> > return slices;
> > }
> >
> > public void validate(DataStorage store, String location) throws IOException 
> > {
> > LOG.info("Enter validate");
> >
> > if (!FileLocalizer.isDirectory(location, store)
> > || !FileLocalizer.fileExists(location + 
> > GlobalParameter.TABLE_SCHEMA_RELATIVE_PATH, store)) {
> > int errCode = 2100;
> > String msg = location + " is not a valid HadoopStore table directory.";
> > throw new ExecException(msg, errCode, PigException.BUG);
> > }
> >
> > LOG.info("Leave validate");
> > }
> >
> > }
> >
> >
> >
> > HSSchema and HSTupleFactory build up a storage based on HDFS. Please let me 
> > know if you see any problem.
> >
> >
> >
> > I really appreciate your help!
> >
> >
> >
> > Thanks,
> >
> > Richard
> >
> >
> >> From: dvrya...@gmail.com
> >> Subject: RE: Custom Loadfunc problem!
> >> Date: Mon, 26 Oct 2009 10:33:21 -0400
> >> To: pig-dev@hadoop.apache.org; pig-dev@hadoop.apache.org
> >>
> >> Jeff,
> >> Slicers dont work in local mode, there is an ancient ticket for that on 
> >> the Jira.
> >>
> >> Richard -- hard to say whats going on without more code. Think you can 
> >> come up with a simplified version of your loadfunc that fails in a similar 
> >> manner, and share it?
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: "zjffdu" <zjf...@gmail.com>
> >> To: pig-dev@hadoop.apache.org
> >> Sent: 10/27/2009 1:45 AM
> >> Subject: RE: Custom Loadfunc problem!
> >>
> >> Illustrate will not execute the job, while dump and store will execute it.
> >> So I think there must be something wrong with your custom slicer. I suggest
> >> you set breakpoint in your slicer and debug it in map reduce mode locally
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: RichardGUO Fei [mailto:gladiato...@hotmail.com]
> >> Sent: 20091026 0:43
> >> To: pig-dev@hadoop.apache.org
> >> Subject: RE: Custom Loadfunc problem!
> >>
> >>
> >> Hi,
> >>
> >>
> >>
> >> Btw, my program works with ILLUSTRATE but n
> >>
> >> [truncated by sender]
> >
> > _________________________________________________________________
> > 全新 Windows 7:寻找最适合您的 PC。了解详情。
> > http://www.microsoft.com/china/windows/buy/
                                          
_________________________________________________________________
约会说不清地方?来试试微软地图最新msn互动功能!
http://ditu.live.com/?form=TL&swm=1

Reply via email to