Sorry,Some mistakes.This is the AvroInputFormat part:
protected FileStatus[] listStatus(JobConf job) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
  //  job.set
    for (FileStatus file : super.listStatus(job))
      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){

     // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
"+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
      this.setJobSchemas(job, file);
      result.add(file);
      }

    return result.toArray(new FileStatus[0]);
  }
private void setJobSchemas(JobConf job,FileStatus file) throws IOException {
// TODO Auto-generated method stub
 // String dst = "hdfs://localhost:9000/user/hive/warehouse/test/test.avro";

  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(file.getPath().toUri(), conf);
 // FileStatus
  FSDataInputStream hdfsInStream = fs.open(new
Path(file.getPath().toString()));

  DataFileStream stream = new DataFileStream(hdfsInStream,new
GenericDatumReader<Object>());
//  System.out.println(stream.getSchema());
 
UtilHelper.localTest("--Schema:"+stream.getSchema().toString(),"/opt/hivelogs/root/mine.log");
  job.set(file.getPath()+"-schema", stream.getSchema().toString());
  stream.close();
  fs.close();
//  DataFileReader rr = new DataFileReader(hdfsInStream);
}

2011/3/21 幻 <[email protected]>

> Thanks.I meet this problem because I want to use avro as storage format for
> HIVE.Now I find a solution,but I'm not sure if it's good enough:
> I change the AvroInputFormat to:
> protected FileStatus[] listStatus(JobConf job) throws IOException {
>     List<FileStatus> result = new ArrayList<FileStatus>();
>   //  job.set
>     for (FileStatus file : super.listStatus(job))
>       if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
>
>      // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
> "+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
>       this.setJobSchemas(job, file);
>       result.add(file);
>       }
>
>     return result.toArray(new FileStatus[0]);
>   }
> And in AvroRecordReader:
>   public AvroRecordReader(JobConf job, FileSplit split)
>     throws IOException {
> //LOG.info("Here is the file:"+split.getPath().getName());
> // UtilHelper.localTest("Here is the file:"+split.getPath().toString(),
> "/opt/hivelogs/root/mine.log");
>  // UtilHelper.localTest("--Schema
> is:"+job.get(split.getPath().toString()+"-schema"),
> "/opt/hivelogs/root/mine.log");
>  /*
>     this(DataFileReader.openReader
>          (new FsInput(split.getPath(), job),
>           job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
>           ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
>           : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
>          split);
>      */
>     this(DataFileReader.openReader
>             (new FsInput(split.getPath(), job),
>              job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
>              ? new
> ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))
>              : new
> SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))),
>             split);
>
>   }
>
> 2011/3/19 Doug Cutting <[email protected]>
>
>> On 03/18/2011 11:31 AM, Harsh J wrote:
>> > Probably a small case, in which I would require reading from multiple
>> > sources in my job (perhaps even process them differently until the Map
>> > phase), with special reader-schemas for each of my sources.
>>
>> How would your mapper detect which schema was in use?  Would it use
>> something like instanceof?  If that's the case, then you could simply
>> use a union as the job's schema.
>>
>> Or would you want a different mapper for each input type?  That seems
>> like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't
>> be too hard to build, but I don't think should be built into the base
>> MapReduce API, but rather a layer above it, no?
>>
>> Doug
>>
>
>

Reply via email to