Thank you for the answer. Tried but the exception * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected* persists
On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <[email protected]> wrote: > try this > > Job job = Job.getInstance(conf); > Job.setName(name); > > Artem Ervits > On Feb 21, 2015 10:57 PM, "David Ginzburg" <[email protected]> > wrote: > >> Hi, >> I am trying to run an MR job on emr with AvromultipleOutput >> >> >> >> >> I get the following exception when running with AMI with hadoop 2.2 2.5 >> Found interface org.apache.hadoop.mapreduce.JobContext, but class was >> expected >> >> I read it is related to incompatible hadoop versions, So I modified >> >> When running with AMI with hadoop 103 I get the following exception: >> >> java.lang.NullPointerException >> at >> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) >> at >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981) >> at >> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681) >> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375) >> at org.apache.hadoop.mapred.Child$4.run(Child.java:259) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:415) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140) >> at org.apache.hadoop.mapred.Child.main(Child.java:253) >> >> >> The driver code is >> >> Job job = new Job(getConf(), "myad"); >> >> >> >> job.setOutputValueClass(NullWritable.class); >> >> >> job.setJarByClass(myAdTextLineMapper.class); >> Path inputOflineFiles = new Path(args[0]); >> Path inputOfUbberFiles = new Path(args[1]); >> >> FileInputFormat.setInputPaths(job, inputOflineFiles); >> >> job.setMapperClass(myAdTextLineMapper.class); >> job.setMapOutputKeyClass(Text.class); >> job.setMapOutputValueClass(UberRecord.class); >> >> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class); >> AvroJob.setOutputKeySchema(job, >> Schema.create(Schema.Type.STRING)); >> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$); >> >> >> job.setReducerClass(myAdReducer.class); >> job.setOutputKeyClass(Text.class); >> job.setOutputValueClass(UberRecord.class); >> job.setNumReduceTasks(2); >> String baseoutputFolder = args[2]; >> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER, >> baseoutputFolder); >> ; >> >> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class); >> >> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder)); >> return job.waitForCompletion(true) ? 0 : 1; >> >> >> the mapper and reducers >> @Override >> public void setup(Context ctx) { >> >> ubp = new UberRecordProcessor(); >> } >> >> @Override >> protected void map(LongWritable key, Text value, Context context) >> throws IOException, InterruptedException { >> try { >> handleLineinMap(value); >> if(ub!=null){ >> context.write(new Text(ub.getAuctionId().toString()), ub); >> context.getCounter("myAd", >> "myAdTextLineMapper").increment(1); >> }else{ >> context.getCounter("myAd", >> "myAdTextLineMapperNull").increment(1); >> } >> } catch (Exception e) { >> context.getCounter("myAd", >> "myAdTextLineMapperError").increment(1); >> logger.warn("could not parse line "+value.toString(),e); >> >> >> } >> } >> >> public class myAdReducer extends >> Reducer<Text, UberRecord, AvroKey<CharSequence>, >> AvroValue<UberRecord>> { >> >> private static Logger logger = Logger.getLogger(myAdReducer.class); >> public static final String BASE_OUTPUT_FOLDER = "base.output.folder"; >> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs; >> UberRecordProcessor ubp = new UberRecordProcessor(); >> // "year=%s/month=%s/day=%s/hour=%s" >> private String baseOutputPath; >> private long reduceAttemptUniqueIdentifier = >> System.currentTimeMillis(); >> >> // 2015-02-01T18:00:25.673Z >> static DateTimeFormatter dateformat = DateTimeFormat >> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); >> >> @Override >> protected void setup(Context context) throws IOException, >> InterruptedException { >> >> amos = new AvroMultipleOutputs(context); >> baseOutputPath = >> context.getConfiguration().get(BASE_OUTPUT_FOLDER); >> >> } >> >> @Override >> protected void reduce(Text key, Iterable<UberRecord> values, Context >> context) >> throws IOException, InterruptedException { >> >> try { >> UberRecord ub = new UberRecord(); >> for (UberRecord ubi : values) { >> // enrich >> if (ubi.getExchange() == null) { >> continue; >> } >> BaseBidRequestEnricher enc = BaseBidRequestEnricher >> .getEnricher(ubi.getExchange().toString()); >> enc.enrich(ubi); >> ub = mergeUB(ub, ubi); >> } >> logger.info("Writing UberRecord [" + ub.toString() + "]"); >> String partition = getPartition(ub); >> >> // context.write(); >> // AvroKey<CharSequence>, AvroValue<UberRecord>> >> amos.write(new AvroKey<CharSequence>(key.toString()), >> new AvroValue<UberRecord>(ub), baseOutputPath + "/" >> + partition + "/p" + >> reduceAttemptUniqueIdentifier); >> } catch (Exception e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> } >> >> public UberRecord mergeUB(UberRecord dest, UberRecord src) { >> List<Field> fields = UberRecord.getClassSchema().getFields(); >> List<Field> engFields = EngageData.getClassSchema().getFields(); >> for (Field field : fields) { >> if (field.name().equals("engageData") >> && dest.getEngageData() != null) { >> EngageData mergedEng = dest.getEngageData(); >> for (Field engField : engFields) { >> if (dest.getEngageData().get(engField.name()) == >> null) { >> mergedEng.put(engField.name(), >> src.getEngageData().get(engField.name())); >> } >> >> } >> dest.setEngageData(mergedEng); >> } else { >> if (dest.get(field.name()) == null) { >> dest.put(field.name(), src.get(field.name())); >> } >> } >> } >> return dest; >> } >> >> >> >>
