This is cross posted to avro-user list (http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3ccf3612f6.94d2%25john.pau...@threattrack.com%3e).
Hello all, I’m having an issue using AvroMultipleOutputs in a map/reduce job. The issue occurs when using a schema that has a union of null and a fixed (among other complex types), default to null, and it is not null. Please find the full stack trace below and a sample map/reduce job that generates an Avro container file and uses that for the m/r input. Note that I can serialize/deserialize without issue using GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would be helpful. Stack trace: java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404) Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39) at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400) at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378) at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78) at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in union null of union in field baz of com.foo.bar.simple_schema at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290) ... 16 more Caused by: java.lang.NullPointerException at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457) at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189) at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167) at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) Sample m/r job: <mr_job> package com.tts.ox.mapreduce.example.avro; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumWriter; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.File; import java.io.IOException; public class AvroContainerFileDriver extends Configured implements Tool { // // define a schema with a union of null and fixed private static final String SCHEMA = "{\n" + " \"namespace\": \"com.foo.bar\",\n" + " \"name\": \"simple_schema\",\n" + " \"type\": \"record\",\n" + " \"fields\": [{\n" + " \"name\": \"foo\",\n" + " \"type\": {\n" + " \"name\": \"bar\",\n" + " \"type\": \"fixed\",\n" + " \"size\": 2\n" + " }\n" + " }, {\n" + " \"name\": \"baz\",\n" + " \"type\": [\"null\", \"bar\"],\n" + " \"default\": null\n" + " }]\n" + "}"; public static class SampleMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> { private AvroMultipleOutputs amos; @Override protected void setup(Context context) { amos = new AvroMultipleOutputs(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { amos.close(); } @Override protected void map(AvroKey<GenericRecord> record, NullWritable ignore, Context context) throws IOException, InterruptedException { // simply write the record to a container using AvroMultipleOutputs amos.write("avro", new AvroKey<GenericRecord>(record.datum()), NullWritable.get()); } } @Override public int run(final String[] args) throws Exception { Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(SCHEMA); // // generate avro container file for input to mapper byte[] dummy = {(byte) 0x01, (byte) 0x02}; GenericData.Fixed foo = new GenericData.Fixed(schema.getField("foo").schema(), dummy); GenericData.Fixed baz = new GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy); GenericRecordBuilder builder = new GenericRecordBuilder(schema) .set(schema.getField("foo"), foo); GenericRecord record0 = builder.build(); // baz is null builder.set(schema.getField("baz"), baz); GenericRecord record1 = builder.build(); // baz is not null, bad news File file = new File("/tmp/avrotest/input/test.avro"); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); dataFileWriter.create(schema, file); dataFileWriter.append(record0); // // HELP: job succeeds when we do not have record with non-null baz, comment out to succeed // dataFileWriter.append(record1); dataFileWriter.close(); // // configure and run job Configuration configuration = new Configuration(); String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); Job job = Job.getInstance(configuration, "Sample Avro Map Reduce"); job.setInputFormatClass(AvroKeyInputFormat.class); AvroJob.setInputKeySchema(job, schema); job.setMapperClass(SampleMapper.class); job.setNumReduceTasks(0); AvroJob.setOutputKeySchema(job, schema); AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, schema); FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input"))); FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output")); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args); System.exit(exitCode); } } </mr_job> Thanks, John Pauley Sr. Software Engineer ThreatTrack Security