Hello!
Thank you for your software! I have one problem with my attempts to write 
avrofile using mapreduce. Can you help me, please. I asked my friends and 
stackoverflow, but nobody can find an answer.
On the input I have file with data for some hive tables. And on the output of 
the MapReduce I want to have some groups of avro files with tables schemes for 
each table. I'm writing data to Generic record and after that I'm trying to 
write it with AvroMultipleOutputs.

But I get a very strange error from reducer part :
>16/08/23 16:08:08 INFO mapreduce.Job: Task Id : 
>attempt_1471450637179_19439_r_000000_0, Status : FAILED Error: 
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
> 16/08/23 16:08:15 INFO mapreduce.Job: Task Id : 
>attempt_1471450637179_19439_r_000000_1, Status : FAILED Error: 
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
> 16/08/23 16:08:23 INFO mapreduce.Job: Task Id : 
>attempt_1471450637179_19439_r_000000_2, Status : FAILED Error: 
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
I simplified my code in attach

My input file:
1234567
12345566
0987654321
 
I add link to stackoverflow, because there is a good highlighting in it.
http://stackoverflow.com/questions/39102348/writing-genericrecord-to-multiple-avro-output
  

Thank you!
Max Statsenko
----------------------------------------------------------------------
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CalendarParserTemp  {

    private static String AVRO_SCHEMA = "{\"type\": \"record\", \"name\": 
\"testschema\",\"fields\": [{\"name\": \"val1\", \"type\": 
\"string\"},{\"name\": \"val2\", \"type\": [\"null\",\"string\"]}]}";
    private static class CalendarMapper extends Mapper<Object, Text, Text, Text>
    {
        @Override
        protected void map(Object key, Text value, Context context) throws 
IOException, InterruptedException
        {
            context.write(new Text("value"), value);
        }
    }

    @SuppressWarnings({"unchecked"})
    private static class CalendarReducer extends 
Reducer<Text,Text,GenericRecord,NullWritable>
    {
        private AvroMultipleOutputs amos;

        public void setup(Context context) throws IOException, 
InterruptedException
        {
            super.setup(context);
            amos = new AvroMultipleOutputs(context);
        }

        public void cleanup(Context context) throws IOException, 
InterruptedException
        {
            if( amos == null ) throw new IOException("MultipleOutput is not 
inited!");
            amos.close();
            super.cleanup(context);
        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context ) 
throws IOException, InterruptedException
        {
            if( amos == null ) throw new IOException("MultipleOutput is not 
inited!");
            Schema.Parser avroSchemaParser = new Schema.Parser();
            Schema avroSchema = avroSchemaParser.parse(AVRO_SCHEMA);


            for (Text value : values)
            {
                GenericRecord genericRecord = new 
GenericData.Record(avroSchema);
                genericRecord.put("val1",value.toString());
                genericRecord.put("val2","test");
                amos.write("value", genericRecord);
            }
        }
    }

    public static void main(String[] args) throws Exception
    {
        Schema.Parser avroSchemaParser = new Schema.Parser();
        Schema avroSchema = avroSchemaParser.parse(AVRO_SCHEMA);

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "MMStatsenko test");

        job.addFileToClassPath(new Path("/apps/mr/avro-mapred-1.8.0.jar"));

        job.setJarByClass(CalendarParserTemp.class);

        job.setMapperClass(CalendarMapper.class);
        job.setReducerClass(CalendarReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        AvroMultipleOutputs.addNamedOutput(job, "value", 
AvroKeyOutputFormat.class, avroSchema );

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

Reply via email to