Hi,
I am trying to run an MR job on emr with AvromultipleOutput
I get the following exception when running with AMI with hadoop 2.2 2.5
Found interface org.apache.hadoop.mapreduce.JobContext, but class was
expected
I read it is related to incompatible hadoop versions, So I modified
When running with AMI with hadoop 103 I get the following exception:
java.lang.NullPointerException
at
org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
The driver code is
Job job = new Job(getConf(), "myad");
job.setOutputValueClass(NullWritable.class);
job.setJarByClass(myAdTextLineMapper.class);
Path inputOflineFiles = new Path(args[0]);
Path inputOfUbberFiles = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputOflineFiles);
job.setMapperClass(myAdTextLineMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UberRecord.class);
job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
job.setReducerClass(myAdReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(UberRecord.class);
job.setNumReduceTasks(2);
String baseoutputFolder = args[2];
job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
baseoutputFolder);
;
LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
return job.waitForCompletion(true) ? 0 : 1;
the mapper and reducers
@Override
public void setup(Context ctx) {
ubp = new UberRecordProcessor();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
try {
handleLineinMap(value);
if(ub!=null){
context.write(new Text(ub.getAuctionId().toString()), ub);
context.getCounter("myAd",
"myAdTextLineMapper").increment(1);
}else{
context.getCounter("myAd",
"myAdTextLineMapperNull").increment(1);
}
} catch (Exception e) {
context.getCounter("myAd",
"myAdTextLineMapperError").increment(1);
logger.warn("could not parse line "+value.toString(),e);
}
}
public class myAdReducer extends
Reducer<Text, UberRecord, AvroKey<CharSequence>,
AvroValue<UberRecord>> {
private static Logger logger = Logger.getLogger(myAdReducer.class);
public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
UberRecordProcessor ubp = new UberRecordProcessor();
// "year=%s/month=%s/day=%s/hour=%s"
private String baseOutputPath;
private long reduceAttemptUniqueIdentifier = System.currentTimeMillis();
// 2015-02-01T18:00:25.673Z
static DateTimeFormatter dateformat = DateTimeFormat
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
amos = new AvroMultipleOutputs(context);
baseOutputPath = context.getConfiguration().get(BASE_OUTPUT_FOLDER);
}
@Override
protected void reduce(Text key, Iterable<UberRecord> values, Context
context)
throws IOException, InterruptedException {
try {
UberRecord ub = new UberRecord();
for (UberRecord ubi : values) {
// enrich
if (ubi.getExchange() == null) {
continue;
}
BaseBidRequestEnricher enc = BaseBidRequestEnricher
.getEnricher(ubi.getExchange().toString());
enc.enrich(ubi);
ub = mergeUB(ub, ubi);
}
logger.info("Writing UberRecord [" + ub.toString() + "]");
String partition = getPartition(ub);
// context.write();
// AvroKey<CharSequence>, AvroValue<UberRecord>>
amos.write(new AvroKey<CharSequence>(key.toString()),
new AvroValue<UberRecord>(ub), baseOutputPath + "/"
+ partition + "/p" +
reduceAttemptUniqueIdentifier);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public UberRecord mergeUB(UberRecord dest, UberRecord src) {
List<Field> fields = UberRecord.getClassSchema().getFields();
List<Field> engFields = EngageData.getClassSchema().getFields();
for (Field field : fields) {
if (field.name().equals("engageData")
&& dest.getEngageData() != null) {
EngageData mergedEng = dest.getEngageData();
for (Field engField : engFields) {
if (dest.getEngageData().get(engField.name()) == null) {
mergedEng.put(engField.name(),
src.getEngageData().get(engField.name()));
}
}
dest.setEngageData(mergedEng);
} else {
if (dest.get(field.name()) == null) {
dest.put(field.name(), src.get(field.name()));
}
}
}
return dest;
}