Let me try that again, without the odd formatting: An avro data file is not created with a FileOutputStream. That will write avro binary data to a file, but not in the avro file format (which is splittable and contains header metadata).
The API for Avro Data Files is here: http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package-s ummary.html On 7/20/11 5:38 PM, "Scott Carey" <[email protected]> wrote: >An avro data file is not created with a FileOutputStream. That will write >= >avro binary data to a file, but not in the avro file format (which is >split= >table and contains header metadata). > >The API for Avro Data Files is here: >http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package- ><http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package >- >s=>summary.html > > > > >On 7/20/11 2:35 PM, "Peter Wolf" <[email protected]> wrote: > > >> >> >> >> >> >> >> Hello, anyone out there know about AVRO file formats and/or Hadoop >> support? >> >> My Hadoop AvroJob code does not recognize the AVRO files created by >> my other code. It seems that the MAGIC number is wrong. >> >> What is going on? How many different ways of encoding AVRO files >> are there, and how do I make sure they match. >> >> I am creating the input files like this... >> >> static public void write(String file, GenericRecord record, >> Schema schema) throws IOException { >> OutputStream o = new FileOutputStream(file); >> GenericDatumWriter w = new GenericDatumWriter(schema); >> Encoder e = EncoderFactory.get().binaryEncoder(o, null); >> w.write(record, e); >> e.flush(); >> } >> >> Hadoop is reading them using org.apache.avro.file.DataFileReader >> >> Here is where it breaks. I checked, and it really is trying to read >> the right file... >> >> /** Open a reader for a file. */ >> public static <D> FileReader<D> >> openReader(SeekableInput in, >> DatumReader<D> >> reader) >> throws IOException { >> if (in.length() < MAGIC.length) >> throw new IOException("Not an Avro data file"); >> >> // read magic header >> byte[] magic = new byte[MAGIC.length]; >> in.seek(0); >> for (int c = 0; c < magic.length; c = in.read(magic, c, >> magic.length-c)) {} >> in.seek(0); >> >> if (Arrays.equals(MAGIC, magic)) // current >> format >> return new DataFileReader<D>(in, reader); >> if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 >> format >> return new DataFileReader12<D>(in, reader); >> >> >>> throw new IOException("Not an Avro data file"); >> <<< >> } >> >> >> >> Some background... >> >> I am trying to write my first AVRO Hadoop application. I am using >> Hadoop Cloudera 20.2-737 and AVRO 1.5.1 >> >> I followed the instructions here... >> >> >>http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/packa >>g >>e-summary.html#package_description >> >> >> The sample code here... >> >> >>http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src >>/ >>test/java/org/apache/avro/mapred/TestWordCount.java?view=markup >> >> Here is my code which breaks with a "Not an Avro data file" error. >> >> >> public static class MapImpl extends AvroMapper<Account, >> Pair<Utf8, Long>> { >> @Override >> public void map(Account account, >> AvroCollector<Pair<Utf8, Long>> collector, >> Reporter reporter) throws IOException { >> StringTokenizer tokens = new >> StringTokenizer(account.timestamp.toString()); >> while (tokens.hasMoreTokens()) >> collector.collect(new Pair<Utf8, Long>(new >> Utf8(tokens.nextToken()), 1L)); >> } >> } >> >> public static class ReduceImpl >> extends AvroReducer<Utf8, Long, Pair<Utf8, >> Long>> { >> @Override >> public void reduce(Utf8 word, Iterable<Long> counts, >> AvroCollector<Pair<Utf8, >> Long>> collector, >> Reporter reporter) throws IOException { >> long sum = 0; >> for (long count : counts) >> sum += count; >> collector.collect(new Pair<Utf8, Long>(word, >> sum)); >> } >> } >> >> public int run(String[] args) throws Exception { >> >> if (args.length != 2) { >> System.err.println("Usage: " + getClass().getName() + " >> <input> <output>"); >> System.exit(2); >> } >> >> JobConf job = new JobConf(this.getClass()); >> Path outputPath = new Path(args[1]); >> >> outputPath.getFileSystem(job).delete(outputPath); >> //WordCountUtil.writeLinesFile(); >> >> job.setJobName(this.getClass().getName()); >> >> AvroJob.setInputSchema(job, Account.schema); >> //Schema.create(Schema.Type.STRING)); >> AvroJob.setOutputSchema(job, >> new Pair<Utf8, Long>(new Utf8(""), >> 0L).getSchema()); >> >> AvroJob.setMapperClass(job, MapImpl.class); >> AvroJob.setCombinerClass(job, ReduceImpl.class); >> AvroJob.setReducerClass(job, ReduceImpl.class); >> >> FileInputFormat.setInputPaths(job, new Path(args[0])); >> FileOutputFormat.setOutputPath(job, outputPath); >> //FileOutputFormat.setCompressOutput(job, true); >> >> //WordCountUtil.setMeta(job); >> >> JobClient.runJob(job); >> >> //WordCountUtil.validateCountsFile(); >> >> return 0; >> } >> >> >> >> > >
