Hi All,
I am working on a modified version of the Avro MapReduce support to make it
play nice with the new Hadoop API (0.20.2). Most of the code if borrowed from
the Avro mapred package, but I decided not to fully abstract away the Mapper
and Reducer classes (like Avro does now using HadoopMapper and HadoopReducer
classes). All else is much the same as the mapred implementation.
When testing, I ran into a issues when emitting empty strings (empty Utf8) from
the mapper as key. I get the following:
org.apache.avro.AvroRuntimeException: java.io.EOFException
at org.apache.avro.io.BinaryData.compare(BinaryData.java:74)
at org.apache.avro.io.BinaryData.compare(BinaryData.java:60)
at
org.apache.avro.mapreduce.AvroKeyComparator.compare(AvroKeyComparator.java:45)
<== this is my own code
at
org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:120)
at
org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:182)
at
org.apache.avro.generic.GenericDatumReader.skip(GenericDatumReader.java:389)
at org.apache.avro.io.BinaryData.compare(BinaryData.java:86)
at org.apache.avro.io.BinaryData.compare(BinaryData.java:72)
... 8 more
The root cause stack trace is as follows (taken from debugger, breakpoint on
the throw new EOFException(); line):
Thread [Thread-11] (Suspended (breakpoint at line 182 in BinaryDecoder))
BinaryDecoder.readLong() line: 182
GenericDatumReader<D>.skip(Schema, Decoder) line: 389
BinaryData.compare(BinaryData$Decoders, Schema) line: 86
BinaryData.compare(byte[], int, int, byte[], int, int, Schema) line: 72
BinaryData.compare(byte[], int, byte[], int, Schema) line: 60
AvroKeyComparator<T>.compare(byte[], int, int, byte[], int, int) line:
45
Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKeyValue()
line: 120
Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKey()
line: 92
AvroMapReduceTest$WordCountingAvroReducer(Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).run(Reducer<KEYIN,VALUEIN,KEYOUT,Contex>)
line: 175
ReduceTask.runNewReducer(JobConf, TaskUmbilicalProtocol, TaskReporter,
RawKeyValueIterator, RawComparator<INKEY>, Class<INKEY>, Class<INVALUE>) line:
572
ReduceTask.run(JobConf, TaskUmbilicalProtocol) line: 414
LocalJobRunner$Job.run() line: 256
I went through the decoding code to see where this comes from, but I can't
immediately spot where it goes wrong. I am guessing the actual problem is
earlier during execution where it possibly increases pos too often.
Has anyone experienced this? I can live without emitting empty keys from MR
jobs, but I ran into this implementing a word count job on a text file with
empty lines (counting those could be a valid use case). I am using Avro 1.5.2.
Thanks for any clues.
Cheers,
Friso