Hi All,

I am working on a modified version of the Avro MapReduce support to make it 
play nice with the new Hadoop API (0.20.2). Most of the code if borrowed from 
the Avro mapred package, but I decided not to fully abstract away the Mapper 
and Reducer classes (like Avro does now using HadoopMapper and HadoopReducer 
classes). All else is much the same as the mapred implementation.

When testing, I ran into a issues when emitting empty strings (empty Utf8) from 
the mapper as key. I get the following:
org.apache.avro.AvroRuntimeException: java.io.EOFException
        at org.apache.avro.io.BinaryData.compare(BinaryData.java:74)
        at org.apache.avro.io.BinaryData.compare(BinaryData.java:60)
        at 
org.apache.avro.mapreduce.AvroKeyComparator.compare(AvroKeyComparator.java:45)  
      <== this is my own code
        at 
org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:120)
        at 
org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
        at 
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
        at 
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:182)
        at 
org.apache.avro.generic.GenericDatumReader.skip(GenericDatumReader.java:389)
        at org.apache.avro.io.BinaryData.compare(BinaryData.java:86)
        at org.apache.avro.io.BinaryData.compare(BinaryData.java:72)
        ... 8 more


The root cause stack trace is as follows (taken from debugger, breakpoint on 
the throw new EOFException(); line):
Thread [Thread-11] (Suspended (breakpoint at line 182 in BinaryDecoder))        
        BinaryDecoder.readLong() line: 182      
        GenericDatumReader<D>.skip(Schema, Decoder) line: 389   
        BinaryData.compare(BinaryData$Decoders, Schema) line: 86        
        BinaryData.compare(byte[], int, int, byte[], int, int, Schema) line: 72 
        BinaryData.compare(byte[], int, byte[], int, Schema) line: 60   
        AvroKeyComparator<T>.compare(byte[], int, int, byte[], int, int) line: 
45       
        
Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKeyValue() 
line: 120  
        Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKey() 
line: 92        
        
AvroMapReduceTest$WordCountingAvroReducer(Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).run(Reducer<KEYIN,VALUEIN,KEYOUT,Contex>)
 line: 175   
        ReduceTask.runNewReducer(JobConf, TaskUmbilicalProtocol, TaskReporter, 
RawKeyValueIterator, RawComparator<INKEY>, Class<INKEY>, Class<INVALUE>) line: 
572       
        ReduceTask.run(JobConf, TaskUmbilicalProtocol) line: 414        
        LocalJobRunner$Job.run() line: 256      

I went through the decoding code to see where this comes from, but I can't 
immediately spot where it goes wrong. I am guessing the actual problem is 
earlier during execution where it possibly increases pos too often.

Has anyone experienced this? I can live without emitting empty keys from MR 
jobs, but I ran into this implementing a word count job on a text file with 
empty lines (counting those could be a valid use case). I am using Avro 1.5.2.

Thanks for any clues.


Cheers,
Friso

Reply via email to