Hi,
All the logs from HadoopStoreStorage are displayed. So I can see that
determineSchema works (this can also be verified by using EXPLAIN A;). Also,
slice method exits normally. However, none of the logs from the Slice is shown
(even in hadoop local mode, if I use System.out.println to output to stdandard
output, no userlog has been recorded). I suspect that Slice methods have not
been entered at all. Below is the code for Slice:
public class HadoopStoreSlice implements Slice {
private static final long serialVersionUID = 9035916017187148968L;
private transient static final Log LOG =
LogFactory.getLog(HadoopStoreSlice.class);
private transient HadoopStoreStorage storeStorage = null;
private int partitionIndex = 0;
private String path;
public HadoopStoreSlice(String path, int partitionIndex) {
this.path = path;
this.partitionIndex = partitionIndex;
}
@Override
public void init(DataStorage store) throws IOException {
LOG.info("Enter init");
storeStorage = new HadoopStoreStorage();
storeStorage.bindTo(path, null, partitionIndex, 0);
}
public long getStart() {
LOG.info("Enter getStart");
//
// Don't know how to compute this value
//
return 0;
}
public long getLength() {
LOG.info("Enter getLength");
//
// Don't know how to compute this value
//
return 0;
}
public String[] getLocations() {
LOG.info("Enter getLocations");
try {
HSSchema tableSchema = new HSSchema(path);
Object[] src =
tableSchema.getPartitionFiles(partitionIndex).toArray();
LOG.info("Locations:");
String[] locations = new String[src.length];
for(int i = 0; i < src.length; i ++ ) {
locations[i] = (String) src[i];
}
LOG.info("Leave getLocations");
return locations;
} catch(Exception e) {
LOG.error(e.toString());
//
// If error occurs, report the location as a whole directory
// (it should not occur)
//
LOG.info("Leave getLocations");
LOG.info("Locations on exception: " + path);
return new String[] { path };
}
}
public boolean next(Tuple value) throws IOException {
LOG.info("Enter next");
if(storeStorage == null) {
throw new IOException("Did not bind to HadoopStoreStorage");
}
Tuple src = storeStorage.getNext();
value.reference(src);
LOG.info("Leave next");
if(src == null)
return false;
else
return true;
}
public long getPos() throws IOException {
LOG.info("Enter getPos");
return 0;
}
public void close() throws IOException {
LOG.info("Enter close");
}
public float getProgress() throws IOException {
LOG.info("Enter getProgress");
// TODO: let the loader decides the progress
return 0;
}
public void readFields(DataInput is) throws IOException {
partitionIndex = is.readInt();
path = is.readUTF();
}
public void write(DataOutput os) throws IOException {
os.writeInt(partitionIndex);
os.writeUTF(path);
}
}
Thanks,
Richard
> Date: Mon, 26 Oct 2009 16:15:53 -0400
> Subject: Re: Custom Loadfunc problem!
> From: [email protected]
> To: [email protected]
>
> Do you get any of your Log messages to come out, or none at all?
>
> -D
>
> 2009/10/26 RichardGUO Fei <[email protected]>:
> >
> > Hi,
> >
> >
> >
> > This is the rough source codes of the slicer/loadfunc:
> >
> >
> >
> > public class HadoopStoreStorage extends Utf8StorageConverter
> > implements LoadFunc, Slicer {
> >
> > private static final Log LOG = LogFactory.getLog(HadoopStoreStorage.class);
> >
> > private transient HSSchema tableSchema = null;
> > private transient HSTupleFactory tupleFactory = null;
> > private int partitionIndex = 0;
> >
> > //
> > // Table directory in HDFS
> > //
> > private String location;
> >
> > public Tuple getNext() throws IOException {
> > if(tupleFactory == null)
> > return null;
> >
> > try {
> > Tuple tuple = tupleFactory.getNextTuple();
> > return tuple;
> > } catch(Exception exp) {
> > System.out.println(exp);
> > exp.printStackTrace();
> > throw new IOException("next failure.");
> > }
> > }
> >
> > public void bindTo(String fileName, BufferedPositionedInputStream in, long
> > offset, long end) throws IOException {
> > LOG.info("Enter bindTo");
> > System.out.println("Enter bindTo");
> >
> > location = fileName;
> > partitionIndex = (int) offset;
> >
> > if(this.tableSchema == null)
> > this.tableSchema = new HSSchema(location);
> > this.tupleFactory = new HSTupleFactory(tableSchema, partitionIndex);
> >
> > LOG.info("Leave bindTo");
> > }
> >
> > public void finish() throws IOException {
> > LOG.info("Enter finish");
> > tupleFactory.close();
> > }
> >
> > public Schema determineSchema(String fileName, ExecType execType,
> > DataStorage storage) throws IOException {
> > LOG.info("Enter determineSchema");
> >
> > //
> > // This method should be implemented in HadoopStore as at compile time, the
> > meta info of a table is already available
> > //
> >
> > //
> > // Make sure that table schema is available
> > //
> > if(tableSchema == null) {
> > tableSchema = new HSSchema(fileName);
> > }
> >
> > try {
> > return tableSchema.toPigSchema();
> > } catch(FrontendException e) {
> > System.out.println(e.toString());
> > throw new IOException("FrontendException encountered.");
> > }
> > }
> >
> > public void fieldsToRead(Schema schema) {
> > LOG.info("Enter fieldsToRead");
> >
> > try {
> > tupleFactory.setFieldsToRead(schema);
> > } catch(FrontendException e) {
> > System.out.println(e.toString());
> > }
> > }
> >
> >
> > //
> > // Slicer methods
> > //
> >
> > //
> > // Create a list of partitions
> > //
> > public Slice[] slice(DataStorage store, String location) throws IOException
> > {
> > LOG.info("Enter slice");
> >
> > //
> > // Validate the location
> > //
> > validate(store, location);
> >
> > this.location = location;
> > //
> > // Load the schema from the location
> > //
> > if(tableSchema == null)
> > tableSchema = new HSSchema(location);
> >
> > List<HadoopStoreSlice> newSlices = new ArrayList<HadoopStoreSlice>();
> >
> > //
> > // Retrieve the number of table horizontal partitions
> > //
> > int NumberOfPartitions = tableSchema.getPartitionNumber();
> >
> > for(int i = 0; i < NumberOfPartitions; i ++ ) {
> > newSlices.add(new HadoopStoreSlice(location, i));
> > }
> >
> > Slice[] slices = new Slice [newSlices.size()];
> > for(int i = 0; i < newSlices.size(); i ++ ) {
> > slices[i] = (Slice) newSlices.get(i);
> > }
> >
> > LOG.info("Leave slice");
> > return slices;
> > }
> >
> > public void validate(DataStorage store, String location) throws IOException
> > {
> > LOG.info("Enter validate");
> >
> > if (!FileLocalizer.isDirectory(location, store)
> > || !FileLocalizer.fileExists(location +
> > GlobalParameter.TABLE_SCHEMA_RELATIVE_PATH, store)) {
> > int errCode = 2100;
> > String msg = location + " is not a valid HadoopStore table directory.";
> > throw new ExecException(msg, errCode, PigException.BUG);
> > }
> >
> > LOG.info("Leave validate");
> > }
> >
> > }
> >
> >
> >
> > HSSchema and HSTupleFactory build up a storage based on HDFS. Please let me
> > know if you see any problem.
> >
> >
> >
> > I really appreciate your help!
> >
> >
> >
> > Thanks,
> >
> > Richard
> >
> >
> >> From: [email protected]
> >> Subject: RE: Custom Loadfunc problem!
> >> Date: Mon, 26 Oct 2009 10:33:21 -0400
> >> To: [email protected]; [email protected]
> >>
> >> Jeff,
> >> Slicers dont work in local mode, there is an ancient ticket for that on
> >> the Jira.
> >>
> >> Richard -- hard to say whats going on without more code. Think you can
> >> come up with a simplified version of your loadfunc that fails in a similar
> >> manner, and share it?
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: "zjffdu" <[email protected]>
> >> To: [email protected]
> >> Sent: 10/27/2009 1:45 AM
> >> Subject: RE: Custom Loadfunc problem!
> >>
> >> Illustrate will not execute the job, while dump and store will execute it.
> >> So I think there must be something wrong with your custom slicer. I suggest
> >> you set breakpoint in your slicer and debug it in map reduce mode locally
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: RichardGUO Fei [mailto:[email protected]]
> >> Sent: 20091026 0:43
> >> To: [email protected]
> >> Subject: RE: Custom Loadfunc problem!
> >>
> >>
> >> Hi,
> >>
> >>
> >>
> >> Btw, my program works with ILLUSTRATE but n
> >>
> >> [truncated by sender]
> >
> > _________________________________________________________________
> > 全新 Windows 7:寻找最适合您的 PC。了解详情。
> > http://www.microsoft.com/china/windows/buy/
_________________________________________________________________
约会说不清地方?来试试微软地图最新msn互动功能!
http://ditu.live.com/?form=TL&swm=1