Hello!
Thank you for your software! I have one problem with my attempts to write
avrofile using mapreduce. Can you help me, please. I asked my friends and
stackoverflow, but nobody can find an answer.
On the input I have file with data for some hive tables. And on the output of
the MapReduce I want to have some groups of avro files with tables schemes for
each table. I'm writing data to Generic record and after that I'm trying to
write it with AvroMultipleOutputs.
But I get a very strange error from reducer part :
>16/08/23 16:08:08 INFO mapreduce.Job: Task Id :
>attempt_1471450637179_19439_r_000000_0, Status : FAILED Error:
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
> 16/08/23 16:08:15 INFO mapreduce.Job: Task Id :
>attempt_1471450637179_19439_r_000000_1, Status : FAILED Error:
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
> 16/08/23 16:08:23 INFO mapreduce.Job: Task Id :
>attempt_1471450637179_19439_r_000000_2, Status : FAILED Error:
>org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;
I simplified my code in attach
My input file:
1234567
12345566
0987654321
I add link to stackoverflow, because there is a good highlighting in it.
http://stackoverflow.com/questions/39102348/writing-genericrecord-to-multiple-avro-output
Thank you!
Max Statsenko
----------------------------------------------------------------------
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CalendarParserTemp {
private static String AVRO_SCHEMA = "{\"type\": \"record\", \"name\":
\"testschema\",\"fields\": [{\"name\": \"val1\", \"type\":
\"string\"},{\"name\": \"val2\", \"type\": [\"null\",\"string\"]}]}";
private static class CalendarMapper extends Mapper<Object, Text, Text, Text>
{
@Override
protected void map(Object key, Text value, Context context) throws
IOException, InterruptedException
{
context.write(new Text("value"), value);
}
}
@SuppressWarnings({"unchecked"})
private static class CalendarReducer extends
Reducer<Text,Text,GenericRecord,NullWritable>
{
private AvroMultipleOutputs amos;
public void setup(Context context) throws IOException,
InterruptedException
{
super.setup(context);
amos = new AvroMultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,
InterruptedException
{
if( amos == null ) throw new IOException("MultipleOutput is not
inited!");
amos.close();
super.cleanup(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context )
throws IOException, InterruptedException
{
if( amos == null ) throw new IOException("MultipleOutput is not
inited!");
Schema.Parser avroSchemaParser = new Schema.Parser();
Schema avroSchema = avroSchemaParser.parse(AVRO_SCHEMA);
for (Text value : values)
{
GenericRecord genericRecord = new
GenericData.Record(avroSchema);
genericRecord.put("val1",value.toString());
genericRecord.put("val2","test");
amos.write("value", genericRecord);
}
}
}
public static void main(String[] args) throws Exception
{
Schema.Parser avroSchemaParser = new Schema.Parser();
Schema avroSchema = avroSchemaParser.parse(AVRO_SCHEMA);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MMStatsenko test");
job.addFileToClassPath(new Path("/apps/mr/avro-mapred-1.8.0.jar"));
job.setJarByClass(CalendarParserTemp.class);
job.setMapperClass(CalendarMapper.class);
job.setReducerClass(CalendarReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
AvroMultipleOutputs.addNamedOutput(job, "value",
AvroKeyOutputFormat.class, avroSchema );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}