which avro version are you using when running outside of hadoop? Regards, *Stanley Shi,*
On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <john.pau...@threattrack.com>wrote: > This is cross posted to avro-user list ( > http://mail-archives.apache.org/mod_mbox/avro-user/201402.mbox/%3ccf3612f6.94d2%25john.pau...@threattrack.com%3e > ). > > Hello all, > > I’m having an issue using AvroMultipleOutputs in a map/reduce job. The > issue occurs when using a schema that has a union of null and a fixed > (among other complex types), default to null, and it is not null. Please > find the full stack trace below and a sample map/reduce job that generates > an Avro container file and uses that for the m/r input. Note that I can > serialize/deserialize without issue using > GenericDatumWriter/GenericDatumReader outside of hadoop… Any insight would > be helpful. > > Stack trace: > java.lang.Exception: > org.apache.avro.file.DataFileWriter$AppendWriteException: > java.lang.NullPointerException: in com.foo.bar.simple_schema in union null > of union in field baz of com.foo.bar.simple_schema > at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404) > Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: > java.lang.NullPointerException: in com.foo.bar.simple_schema in union null > of union in field baz of com.foo.bar.simple_schema > at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296) > at > org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77) > at > org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39) > at > org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400) > at > org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378) > at > com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:78) > at > com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver.java:62) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) > at > org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in > union null of union in field baz of com.foo.bar.simple_schema > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) > at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290) > ... 16 more > Caused by: java.lang.NullPointerException > at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457) > at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189) > at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167) > at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608) > at > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265) > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597) > at > org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) > at > org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143) > > Sample m/r job: > <mr_job> > package com.tts.ox.mapreduce.example.avro; > > import org.apache.avro.Schema; > import org.apache.avro.file.DataFileWriter; > import org.apache.avro.generic.GenericData; > import org.apache.avro.generic.GenericDatumWriter; > import org.apache.avro.generic.GenericRecord; > import org.apache.avro.generic.GenericRecordBuilder; > import org.apache.avro.io.DatumWriter; > import org.apache.avro.mapred.AvroKey; > import org.apache.avro.mapreduce.AvroJob; > import org.apache.avro.mapreduce.AvroKeyInputFormat; > import org.apache.avro.mapreduce.AvroKeyOutputFormat; > import org.apache.avro.mapreduce.AvroMultipleOutputs; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.Mapper; > import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; > import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > import org.apache.hadoop.util.GenericOptionsParser; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > > import java.io.File; > import java.io.IOException; > > public class AvroContainerFileDriver extends Configured implements Tool { > // > // define a schema with a union of null and fixed > private static final String SCHEMA = "{\n" + > " \"namespace\": \"com.foo.bar\",\n" + > " \"name\": \"simple_schema\",\n" + > " \"type\": \"record\",\n" + > " \"fields\": [{\n" + > " \"name\": \"foo\",\n" + > " \"type\": {\n" + > " \"name\": \"bar\",\n" + > " \"type\": \"fixed\",\n" + > " \"size\": 2\n" + > " }\n" + > " }, {\n" + > " \"name\": \"baz\",\n" + > " \"type\": [\"null\", \"bar\"],\n" + > " \"default\": null\n" + > " }]\n" + > "}"; > > public static class SampleMapper extends > Mapper<AvroKey<GenericRecord>, NullWritable, NullWritable, NullWritable> { > private AvroMultipleOutputs amos; > > @Override > protected void setup(Context context) { > amos = new AvroMultipleOutputs(context); > } > > @Override > protected void cleanup(Context context) throws IOException, > InterruptedException { > amos.close(); > } > > @Override > protected void map(AvroKey<GenericRecord> record, NullWritable > ignore, Context context) > throws IOException, InterruptedException { > // simply write the record to a container using AvroMultipleOutputs > amos.write("avro", new AvroKey<GenericRecord>(record.datum()), > NullWritable.get()); > } > } > > @Override > public int run(final String[] args) throws Exception { > Schema.Parser parser = new Schema.Parser(); > Schema schema = parser.parse(SCHEMA); > > // > // generate avro container file for input to mapper > byte[] dummy = {(byte) 0x01, (byte) 0x02}; > GenericData.Fixed foo = new > GenericData.Fixed(schema.getField("foo").schema(), dummy); > GenericData.Fixed baz = new > GenericData.Fixed(schema.getField("baz").schema().getTypes().get(1), dummy); > > GenericRecordBuilder builder = new GenericRecordBuilder(schema) > .set(schema.getField("foo"), foo); > GenericRecord record0 = builder.build(); // baz is null > > builder.set(schema.getField("baz"), baz); > GenericRecord record1 = builder.build(); // baz is not null, bad > news > > File file = new File("/tmp/avrotest/input/test.avro"); > DatumWriter<GenericRecord> datumWriter = new > GenericDatumWriter<GenericRecord>(schema); > DataFileWriter<GenericRecord> dataFileWriter = new > DataFileWriter<GenericRecord>(datumWriter); > dataFileWriter.create(schema, file); > dataFileWriter.append(record0); > // > // HELP: job succeeds when we do not have record with non-null > baz, comment out to succeed > // > dataFileWriter.append(record1); > dataFileWriter.close(); > > // > // configure and run job > Configuration configuration = new Configuration(); > String[] otherArgs = new GenericOptionsParser(configuration, > args).getRemainingArgs(); > Job job = Job.getInstance(configuration, "Sample Avro Map Reduce"); > > job.setInputFormatClass(AvroKeyInputFormat.class); > AvroJob.setInputKeySchema(job, schema); > > job.setMapperClass(SampleMapper.class); > job.setNumReduceTasks(0); > > AvroJob.setOutputKeySchema(job, schema); > AvroMultipleOutputs.addNamedOutput(job, "avro", > AvroKeyOutputFormat.class, schema); > > FileInputFormat.addInputPath(job, new > Path(("/tmp/avrotest/input"))); > FileOutputFormat.setOutputPath(job, new > Path("/tmp/avrotest/output")); > > return (job.waitForCompletion(true) ? 0 : 1); > } > > public static void main(String[] args) throws Exception { > int exitCode = ToolRunner.run(new AvroContainerFileDriver(), args); > System.exit(exitCode); > } > } > </mr_job> > > Thanks, > John Pauley > Sr. Software Engineer > ThreatTrack Security > >