Hello, anyone out there know about AVRO file formats and/or Hadoop support?
My Hadoop AvroJob code does not recognize the AVRO files created by my
other code. It seems that the MAGIC number is wrong.
What is going on? How many different ways of encoding AVRO files are
there, and how do I make sure they match.
I am creating the input files like this...
static public void write(String file, GenericRecord record, Schema
schema) throws IOException {
OutputStream o = new FileOutputStream(file);
GenericDatumWriter w = new GenericDatumWriter(schema);
Encoder e = EncoderFactory.get().binaryEncoder(o, null);
w.write(record, e);
e.flush();
}
Hadoop is reading them using org.apache.avro.file.DataFileReader
Here is where it breaks. I checked, and it really is trying to read the
right file...
/** Open a reader for a file. */
public static <D> FileReader<D> openReader(SeekableInput in,
DatumReader<D> reader)
throws IOException {
if (in.length() < MAGIC.length)
throw new IOException("Not an Avro data file");
// read magic header
byte[] magic = new byte[MAGIC.length];
in.seek(0);
for (int c = 0; c < magic.length; c = in.read(magic, c,
magic.length-c)) {}
in.seek(0);
if (Arrays.equals(MAGIC, magic)) // current format
return new DataFileReader<D>(in, reader);
if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2 format
return new DataFileReader12<D>(in, reader);
>>> throw new IOException("Not an Avro data file"); <<<
}
Some background...
I am trying to write my first AVRO Hadoop application. I am using
Hadoop Cloudera 20.2-737 and AVRO 1.5.1
I followed the instructions here...
http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html#package_description
The sample code here...
http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWordCount.java?view=markup
Here is my code which breaks with a "Not an Avro data file" error.
public static class MapImpl extends AvroMapper<Account, Pair<Utf8,
Long>> {
@Override
public void map(Account account, AvroCollector<Pair<Utf8,
Long>> collector,
Reporter reporter) throws IOException {
StringTokenizer tokens = new
StringTokenizer(account.timestamp.toString());
while (tokens.hasMoreTokens())
collector.collect(new Pair<Utf8, Long>(new
Utf8(tokens.nextToken()), 1L));
}
}
public static class ReduceImpl
extends AvroReducer<Utf8, Long, Pair<Utf8, Long>> {
@Override
public void reduce(Utf8 word, Iterable<Long> counts,
AvroCollector<Pair<Utf8, Long>> collector,
Reporter reporter) throws IOException {
long sum = 0;
for (long count : counts)
sum += count;
collector.collect(new Pair<Utf8, Long>(word, sum));
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: " + getClass().getName() + "
<input> <output>");
System.exit(2);
}
JobConf job = new JobConf(this.getClass());
Path outputPath = new Path(args[1]);
outputPath.getFileSystem(job).delete(outputPath);
//WordCountUtil.writeLinesFile();
job.setJobName(this.getClass().getName());
AvroJob.setInputSchema(job, Account.schema);
//Schema.create(Schema.Type.STRING));
AvroJob.setOutputSchema(job,
new Pair<Utf8, Long>(new Utf8(""), 0L).getSchema());
AvroJob.setMapperClass(job, MapImpl.class);
AvroJob.setCombinerClass(job, ReduceImpl.class);
AvroJob.setReducerClass(job, ReduceImpl.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
//FileOutputFormat.setCompressOutput(job, true);
//WordCountUtil.setMeta(job);
JobClient.runJob(job);
//WordCountUtil.validateCountsFile();
return 0;
}