I am trying to run a MR job from oozie workflow with avro data file as an
input and output as well. Mapper emits Text and IntWritable. I am using a
new mr api - mapreduce. My workflow definition is following:

    <workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf">
    <global>
       <job-tracker>${jobTracker}</job-tracker>
       <name-node>${nameNode}</name-node>
       <configuration>
            <property>
                <name>mapreduce.job.queuename</name>
                <value>${queueName}</value>
            </property>
        </configuration>
    </global>

        <start to="mr-node"/>

        <action name="mr-node">
            <map-reduce>
                <prepare>
                    <delete path="${nameNode}/${outputDir}"/>
                </prepare>
                <configuration>
                    <!-- BEGIN: SNIPPET TO ADD IN ORDER TO MAKE USE OF NEW
HADOOP API -->
                    <property>
                      <name>mapred.reducer.new-api</name>
                      <value>true</value>
                    </property>
                    <property>
                      <name>mapred.mapper.new-api</name>
                      <value>true</value>
                    </property>
                    <!-- END: SNIPPET -->
                    <property>
                        <name>mapreduce.map.class</name>

<value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifMapper</value>
                    </property>
                    <property>
                        <name>mapreduce.reduce.class</name>

<value>com.ncr.bigdata.mr.avro.AvroPifDriver$PifReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>${nameNode}/${inputDir}</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>${nameNode}/${outputDir}</value>
                    </property>
                    <property>
                        <name>mapred.input.format.class</name>

<value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
                    </property>
                    <property>
                        <name>avro.schema.input.key</name>

<value>{"type":"record","name":"SampleRecord","namespace":"org.co.sample.etl.domain","fields":[{"name":"requiredName","type":"string"},{"name":"optionalName","type":["null","string"]},{"name":"dataItemLong","type":"long"},{"name":"dataItemInt","type":"int"},{"name":"startTime","type":"long"},{"name":"endTime","type":"long"}]}</value>
                    </property>


                    <property>
                        <name>mapred.output.format.class</name>

<value>org.apache.avro.mapreduce.AvroKeyValueOutputFormat</value>
                    </property>
                    <property>
                        <name>mapred.output.key.class</name>
                        <value>org.apache.avro.mapred.AvroKey</value>
                    </property>
                    <property>
                        <name>mapred.output.value.class</name>
                        <value>org.apache.avro.mapred.AvroValue</value>
                    </property>

                    <property>
                        <name>avro.schema.output.key</name>
                        <value>string</value>
                    </property>
                    <property>
                        <name>avro.schema.output.value</name>
                        <value>int</value>
                    </property>


                </configuration>
            </map-reduce>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Map/Reduce failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
    </workflow-app>

My mapper looks following:

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;

     public static class PifMapper extends Mapper<AvroKey<PosData>,
NullWritable, Text, IntWritable> {

            @Override
            public void map(AvroKey<PosData> key, NullWritable value,
Context context)
                    throws IOException, InterruptedException {
            ...
            }
    }

I am getting following error:

    140807041959771-oozie-oozi-W@mr-node] Launcher exception:
mapred.input.format.class is incompatible with new map API mode.
    java.io.IOException: mapred.input.format.class is incompatible with new
map API mode.
    at org.apache.hadoop.mapreduce.Job.ensureNotSet(Job.java:1172)
    at org.apache.hadoop.mapreduce.Job.setUseNewAPI(Job.java:1198)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1261)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562)
    at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:557)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    ...

I am using Hadoop 2.2.0 (HDP 2.0), Oozie 4.0.0, Avro 1.7.4

Map reduce jobs submitted via driver class works fine.
org.apache.avro.mapreduce.AvroKeyInputFormat should be implementation of
the new mapreduce as well.

To make sure that there is no lib clash I removed shared lib from ozzie and
all libs are included to workflow lib dir.

Any hints?


-- 
Jakub Stransky

Reply via email to