I am running with EMR
AMI version:3.3.1
Hadoop distribution:Amazon 2.4.0
The hadoop jars are provided
<hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2-beta</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>avro-mapred</artifactId>
<groupId>org.apache.avro</groupId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.0.0</version>
<classifier>hadoop1</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>org.json</artifactId>
<version>chargebee-1.0</version>
</dependency>
<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo</artifactId>
<version>0.4.19</version>
</dependency>
<dependency>
<groupId>net.sf.uadetector</groupId>
<artifactId>uadetector-resources</artifactId>
<version>2014.04</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
</dependencies>
On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]> wrote:
> Share your avro dependencies(versions) in case your using maven and hadoop
> dependencies (version)
>
>
>
> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
> wrote:
>
>> Check your Hadoop version. In older version JobContext was interface and
>> in new one its class.
>>
>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <[email protected]
>> > wrote:
>>
>>> Thank you for the answer.
>>>
>>> Tried but the exception
>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>> class was expected*
>>> persists
>>>
>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <[email protected]>
>>> wrote:
>>>
>>>> try this
>>>>
>>>> Job job = Job.getInstance(conf);
>>>> Job.setName(name);
>>>>
>>>> Artem Ervits
>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>> expected
>>>>>
>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>
>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>
>>>>> java.lang.NullPointerException
>>>>> at
>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>> at
>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>> at
>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>
>>>>>
>>>>> The driver code is
>>>>>
>>>>> Job job = new Job(getConf(), "myad");
>>>>>
>>>>>
>>>>>
>>>>> job.setOutputValueClass(NullWritable.class);
>>>>>
>>>>>
>>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>>> Path inputOflineFiles = new Path(args[0]);
>>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>>
>>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>
>>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>>> job.setMapOutputKeyClass(Text.class);
>>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>>
>>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>> AvroJob.setOutputKeySchema(job,
>>>>> Schema.create(Schema.Type.STRING));
>>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>
>>>>>
>>>>> job.setReducerClass(myAdReducer.class);
>>>>> job.setOutputKeyClass(Text.class);
>>>>> job.setOutputValueClass(UberRecord.class);
>>>>> job.setNumReduceTasks(2);
>>>>> String baseoutputFolder = args[2];
>>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>> baseoutputFolder);
>>>>> ;
>>>>>
>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>
>>>>> FileOutputFormat.setOutputPath(job, new
>>>>> Path(baseoutputFolder));
>>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>>
>>>>>
>>>>> the mapper and reducers
>>>>> @Override
>>>>> public void setup(Context ctx) {
>>>>>
>>>>> ubp = new UberRecordProcessor();
>>>>> }
>>>>>
>>>>> @Override
>>>>> protected void map(LongWritable key, Text value, Context context)
>>>>> throws IOException, InterruptedException {
>>>>> try {
>>>>> handleLineinMap(value);
>>>>> if(ub!=null){
>>>>> context.write(new Text(ub.getAuctionId().toString()),
>>>>> ub);
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapper").increment(1);
>>>>> }else{
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapperNull").increment(1);
>>>>> }
>>>>> } catch (Exception e) {
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapperError").increment(1);
>>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>>
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>> public class myAdReducer extends
>>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>> AvroValue<UberRecord>> {
>>>>>
>>>>> private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>>> public static final String BASE_OUTPUT_FOLDER =
>>>>> "base.output.folder";
>>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>> outputs;
>>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>>> private String baseOutputPath;
>>>>> private long reduceAttemptUniqueIdentifier =
>>>>> System.currentTimeMillis();
>>>>>
>>>>> // 2015-02-01T18:00:25.673Z
>>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>
>>>>> @Override
>>>>> protected void setup(Context context) throws IOException,
>>>>> InterruptedException {
>>>>>
>>>>> amos = new AvroMultipleOutputs(context);
>>>>> baseOutputPath =
>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>>> Context context)
>>>>> throws IOException, InterruptedException {
>>>>>
>>>>> try {
>>>>> UberRecord ub = new UberRecord();
>>>>> for (UberRecord ubi : values) {
>>>>> // enrich
>>>>> if (ubi.getExchange() == null) {
>>>>> continue;
>>>>> }
>>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>> .getEnricher(ubi.getExchange().toString());
>>>>> enc.enrich(ubi);
>>>>> ub = mergeUB(ub, ubi);
>>>>> }
>>>>> logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>>> String partition = getPartition(ub);
>>>>>
>>>>> // context.write();
>>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>>> + partition + "/p" +
>>>>> reduceAttemptUniqueIdentifier);
>>>>> } catch (Exception e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>> List<Field> engFields =
>>>>> EngageData.getClassSchema().getFields();
>>>>> for (Field field : fields) {
>>>>> if (field.name().equals("engageData")
>>>>> && dest.getEngageData() != null) {
>>>>> EngageData mergedEng = dest.getEngageData();
>>>>> for (Field engField : engFields) {
>>>>> if (dest.getEngageData().get(engField.name()) ==
>>>>> null) {
>>>>> mergedEng.put(engField.name(),
>>>>>
>>>>> src.getEngageData().get(engField.name()));
>>>>> }
>>>>>
>>>>> }
>>>>> dest.setEngageData(mergedEng);
>>>>> } else {
>>>>> if (dest.get(field.name()) == null) {
>>>>> dest.put(field.name(), src.get(field.name()));
>>>>> }
>>>>> }
>>>>> }
>>>>> return dest;
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>