Hi,
I am trying to run an MR job on emr with AvromultipleOutput



I get the following exception when running with AMI with hadoop 2.2 2.5
Found interface org.apache.hadoop.mapreduce.JobContext, but class was
expected

I read it is related to incompatible hadoop versions, So I modified

When running with AMI with hadoop 103 I get the following exception:

java.lang.NullPointerException
    at
org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
    at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)


The driver code is

Job job = new Job(getConf(), "myad");



        job.setOutputValueClass(NullWritable.class);


        job.setJarByClass(myAdTextLineMapper.class);
        Path inputOflineFiles = new Path(args[0]);
        Path inputOfUbberFiles = new Path(args[1]);

         FileInputFormat.setInputPaths(job, inputOflineFiles);

        job.setMapperClass(myAdTextLineMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(UberRecord.class);

        job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
        AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);


        job.setReducerClass(myAdReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(UberRecord.class);
        job.setNumReduceTasks(2);
        String baseoutputFolder = args[2];
        job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
                baseoutputFolder);
        ;

LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
        return job.waitForCompletion(true) ? 0 : 1;


the mapper and reducers
@Override
    public void setup(Context ctx) {

        ubp = new UberRecordProcessor();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        try {
            handleLineinMap(value);
            if(ub!=null){
                context.write(new Text(ub.getAuctionId().toString()), ub);
                context.getCounter("myAd",
"myAdTextLineMapper").increment(1);
            }else{
                context.getCounter("myAd",
"myAdTextLineMapperNull").increment(1);
            }
        } catch (Exception e) {
            context.getCounter("myAd",
"myAdTextLineMapperError").increment(1);
            logger.warn("could not parse line "+value.toString(),e);


        }
    }

public class myAdReducer extends
        Reducer<Text, UberRecord, AvroKey<CharSequence>,
AvroValue<UberRecord>> {

    private static Logger logger = Logger.getLogger(myAdReducer.class);
    public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
    AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
    UberRecordProcessor ubp = new UberRecordProcessor();
    // "year=%s/month=%s/day=%s/hour=%s"
    private String baseOutputPath;
    private long reduceAttemptUniqueIdentifier = System.currentTimeMillis();

    // 2015-02-01T18:00:25.673Z
    static DateTimeFormatter dateformat = DateTimeFormat
            .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {

        amos = new AvroMultipleOutputs(context);
        baseOutputPath = context.getConfiguration().get(BASE_OUTPUT_FOLDER);

    }

    @Override
    protected void reduce(Text key, Iterable<UberRecord> values, Context
context)
            throws IOException, InterruptedException {

        try {
            UberRecord ub = new UberRecord();
            for (UberRecord ubi : values) {
                // enrich
                if (ubi.getExchange() == null) {
                    continue;
                }
                BaseBidRequestEnricher enc = BaseBidRequestEnricher
                        .getEnricher(ubi.getExchange().toString());
                enc.enrich(ubi);
                ub = mergeUB(ub, ubi);
            }
            logger.info("Writing UberRecord [" + ub.toString() + "]");
            String partition = getPartition(ub);

            // context.write();
            // AvroKey<CharSequence>, AvroValue<UberRecord>>
            amos.write(new AvroKey<CharSequence>(key.toString()),
                    new AvroValue<UberRecord>(ub), baseOutputPath + "/"
                            + partition + "/p" +
reduceAttemptUniqueIdentifier);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public UberRecord mergeUB(UberRecord dest, UberRecord src) {
        List<Field> fields = UberRecord.getClassSchema().getFields();
        List<Field> engFields = EngageData.getClassSchema().getFields();
        for (Field field : fields) {
            if (field.name().equals("engageData")
                    && dest.getEngageData() != null) {
                EngageData mergedEng = dest.getEngageData();
                for (Field engField : engFields) {
                    if (dest.getEngageData().get(engField.name()) == null) {
                        mergedEng.put(engField.name(),
                                src.getEngageData().get(engField.name()));
                    }

                }
                dest.setEngageData(mergedEng);
            } else {
                if (dest.get(field.name()) == null) {
                    dest.put(field.name(), src.get(field.name()));
                }
            }
        }
        return dest;
    }

Reply via email to