Sorry,Some mistakes.This is the AvroInputFormat part:
protected FileStatus[] listStatus(JobConf job) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
// job.set
for (FileStatus file : super.listStatus(job))
if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
// UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
"+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
this.setJobSchemas(job, file);
result.add(file);
}
return result.toArray(new FileStatus[0]);
}
private void setJobSchemas(JobConf job,FileStatus file) throws IOException {
// TODO Auto-generated method stub
// String dst = "hdfs://localhost:9000/user/hive/warehouse/test/test.avro";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(file.getPath().toUri(), conf);
// FileStatus
FSDataInputStream hdfsInStream = fs.open(new
Path(file.getPath().toString()));
DataFileStream stream = new DataFileStream(hdfsInStream,new
GenericDatumReader<Object>());
// System.out.println(stream.getSchema());
UtilHelper.localTest("--Schema:"+stream.getSchema().toString(),"/opt/hivelogs/root/mine.log");
job.set(file.getPath()+"-schema", stream.getSchema().toString());
stream.close();
fs.close();
// DataFileReader rr = new DataFileReader(hdfsInStream);
}
2011/3/21 幻 <[email protected]>
> Thanks.I meet this problem because I want to use avro as storage format for
> HIVE.Now I find a solution,but I'm not sure if it's good enough:
> I change the AvroInputFormat to:
> protected FileStatus[] listStatus(JobConf job) throws IOException {
> List<FileStatus> result = new ArrayList<FileStatus>();
> // job.set
> for (FileStatus file : super.listStatus(job))
> if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
>
> // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
> "+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
> this.setJobSchemas(job, file);
> result.add(file);
> }
>
> return result.toArray(new FileStatus[0]);
> }
> And in AvroRecordReader:
> public AvroRecordReader(JobConf job, FileSplit split)
> throws IOException {
> //LOG.info("Here is the file:"+split.getPath().getName());
> // UtilHelper.localTest("Here is the file:"+split.getPath().toString(),
> "/opt/hivelogs/root/mine.log");
> // UtilHelper.localTest("--Schema
> is:"+job.get(split.getPath().toString()+"-schema"),
> "/opt/hivelogs/root/mine.log");
> /*
> this(DataFileReader.openReader
> (new FsInput(split.getPath(), job),
> job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
> ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
> : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
> split);
> */
> this(DataFileReader.openReader
> (new FsInput(split.getPath(), job),
> job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
> ? new
> ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))
> : new
> SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))),
> split);
>
> }
>
> 2011/3/19 Doug Cutting <[email protected]>
>
>> On 03/18/2011 11:31 AM, Harsh J wrote:
>> > Probably a small case, in which I would require reading from multiple
>> > sources in my job (perhaps even process them differently until the Map
>> > phase), with special reader-schemas for each of my sources.
>>
>> How would your mapper detect which schema was in use? Would it use
>> something like instanceof? If that's the case, then you could simply
>> use a union as the job's schema.
>>
>> Or would you want a different mapper for each input type? That seems
>> like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't
>> be too hard to build, but I don't think should be built into the base
>> MapReduce API, but rather a layer above it, no?
>>
>> Doug
>>
>
>