When running a map reduce job using avro mapred we're having some issues
with combiners.
When running over a small data set map side combiners run and report that
they combined records.
When running over a larger data set combiners run and report that they
combined 1.4 Billion records into 6 million. However the reduce phase fails
with:
2011-10-19 21:37:34,777 WARN org.apache.hadoop.mapred.ReduceTask:
attempt_201109220009_0156_r_000000_0 Merge of the inmemory files threw
an exception: java.io.IOException: Intermediate merge failed
at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: org.apache.avro.AvroRuntimeException: No field named rowKey in: null
at org.apache.avro.reflect.ReflectData.findField(ReflectData.java:194)
at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:179)
at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:96)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:102)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:65)
at
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:102)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
at
org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:131)
at
org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:114)
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:179)
at
org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1025)
at
org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:52)
at
org.apache.avro.mapred.HadoopCombiner$PairCollector.collect(HadoopCombiner.java:40)
at
com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:63)
at
com.ngmoco.ngpipes.sourcing.NgBucketingEventCountingCombiner.reduce(NgBucketingEventCountingCombiner.java:17)
at
org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:61)
at
org.apache.avro.mapred.HadoopReducerBase.reduce(HadoopReducerBase.java:30)
at
org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1296)
at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2701)
... 1 more
rowKey is only present in our output schema. In looking at the code it
looks like the combiner is using the wrong collector.
Commenting out the Combiner means that everything works well. Running over
a smaller dataset results in the job running well. Basically anything that
means that https://issues.apache.org/jira/browse/HADOOP-3226 doesn't run
means that the job works.
Any ideas on how to either fix this? The above patch to hadoop was
committed to trunk without any additional tests so I'm not really sure how
to get this to repro on a small non-distributed scale for a unit test.