Hadoop-core is now hadoop-common
Artem Ervits
On Feb 24, 2015 12:57 PM, "David Ginzburg" <[email protected]> wrote:
> I am running with EMR
> AMI version:3.3.1
> Hadoop distribution:Amazon 2.4.0
>
> The hadoop jars are provided
>
>
> <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
>
> <dependencies>
>
> <dependency>
> <groupId>com.google.guava</groupId>
> <artifactId>guava</artifactId>
> <version>18.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka_2.10</artifactId>
> <version>0.8.2-beta</version>
> <scope>compile</scope>
> <exclusions>
> <exclusion>
> <groupId>com.sun.jmx</groupId>
> <artifactId>jmxri</artifactId>
> </exclusion>
> <exclusion>
> <groupId>com.sun.jdmk</groupId>
> <artifactId>jmxtools</artifactId>
> </exclusion>
> <exclusion>
> <groupId>javax.jms</groupId>
> <artifactId>jms</artifactId>
> </exclusion>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> <exclusion>
> <artifactId>snappy-java</artifactId>
> <groupId>org.xerial.snappy</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>com.maxmind.geoip2</groupId>
> <artifactId>geoip2</artifactId>
> <version>2.1.0</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk</artifactId>
> <version>1.9.3</version>
> </dependency>
> <dependency>
> <groupId>com.google.code.gson</groupId>
> <artifactId>gson</artifactId>
> <version>1.6</version>
> </dependency>
> <dependency>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> <version>1.2.17</version>
> </dependency>
> <dependency>
> <groupId>joda-time</groupId>
> <artifactId>joda-time</artifactId>
> <version>2.6</version>
> </dependency>
> <dependency>
> <groupId>org.mockito</groupId>
> <artifactId>mockito-all</artifactId>
> <version>1.9.5</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.mrunit</groupId>
> <artifactId>mrunit</artifactId>
> <version>1.1.0</version>
> <classifier>hadoop2</classifier>
> <exclusions>
> <exclusion>
> <artifactId>guava</artifactId>
> <groupId>com.google.guava</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>4.11</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-core</artifactId>
> <version>${hadoop.version}</version>
> <scope>provided</scope>
> <exclusions>
> <exclusion>
> <artifactId>jets3t</artifactId>
> <groupId>net.java.dev.jets3t</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <artifactId>jets3t</artifactId>
> <groupId>net.java.dev.jets3t</groupId>
> <version>0.9.2</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
> <scope>provided</scope>
> <exclusions>
> <exclusion>
> <artifactId>guava</artifactId>
> <groupId>com.google.guava</groupId>
> </exclusion>
> <exclusion>
> <artifactId>avro</artifactId>
> <groupId>org.apache.avro</groupId>
> </exclusion>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <artifactId>avro-mapred</artifactId>
> <groupId>org.apache.avro</groupId>
> <version>1.7.7</version>
> </dependency>
> <dependency>
> <groupId>org.apache.mrunit</groupId>
> <artifactId>mrunit</artifactId>
> <version>1.0.0</version>
> <classifier>hadoop1</classifier>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.json</groupId>
> <artifactId>org.json</artifactId>
> <version>chargebee-1.0</version>
> </dependency>
> <dependency>
> <groupId>com.hadoop.gplcompression</groupId>
> <artifactId>hadoop-lzo</artifactId>
> <version>0.4.19</version>
> </dependency>
> <dependency>
> <groupId>net.sf.uadetector</groupId>
> <artifactId>uadetector-resources</artifactId>
> <version>2014.04</version>
> <exclusions>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> </exclusions>
> </dependency>
>
> <dependency>
> <groupId>commons-io</groupId>
> <artifactId>commons-io</artifactId>
> <version>2.4</version>
> </dependency>
>
> <dependency>
> <groupId>org.codehaus.jackson</groupId>
> <artifactId>jackson-mapper-asl</artifactId>
> <version>1.9.13</version>
> </dependency>
> <dependency>
> <groupId>org.codehaus.jackson</groupId>
> <artifactId>jackson-core-asl</artifactId>
> <version>1.9.13</version>
> </dependency>
>
> </dependencies>
>
>
>
> On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
> wrote:
>
>> Share your avro dependencies(versions) in case your using maven and
>> hadoop dependencies (version)
>>
>>
>>
>> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
>> wrote:
>>
>>> Check your Hadoop version. In older version JobContext was interface and
>>> in new one its class.
>>>
>>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <
>>> [email protected]> wrote:
>>>
>>>> Thank you for the answer.
>>>>
>>>> Tried but the exception
>>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>>> class was expected*
>>>> persists
>>>>
>>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <[email protected]>
>>>> wrote:
>>>>
>>>>> try this
>>>>>
>>>>> Job job = Job.getInstance(conf);
>>>>> Job.setName(name);
>>>>>
>>>>> Artem Ervits
>>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I get the following exception when running with AMI with hadoop 2.2
>>>>>> 2.5
>>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>>> expected
>>>>>>
>>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>>
>>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>> at
>>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>> at
>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>> at
>>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>>
>>>>>>
>>>>>> The driver code is
>>>>>>
>>>>>> Job job = new Job(getConf(), "myad");
>>>>>>
>>>>>>
>>>>>>
>>>>>> job.setOutputValueClass(NullWritable.class);
>>>>>>
>>>>>>
>>>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>>>> Path inputOflineFiles = new Path(args[0]);
>>>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>>>
>>>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>>
>>>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>>>> job.setMapOutputKeyClass(Text.class);
>>>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>>>
>>>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>> AvroJob.setOutputKeySchema(job,
>>>>>> Schema.create(Schema.Type.STRING));
>>>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>>
>>>>>>
>>>>>> job.setReducerClass(myAdReducer.class);
>>>>>> job.setOutputKeyClass(Text.class);
>>>>>> job.setOutputValueClass(UberRecord.class);
>>>>>> job.setNumReduceTasks(2);
>>>>>> String baseoutputFolder = args[2];
>>>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>> baseoutputFolder);
>>>>>> ;
>>>>>>
>>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>>
>>>>>> FileOutputFormat.setOutputPath(job, new
>>>>>> Path(baseoutputFolder));
>>>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>>>
>>>>>>
>>>>>> the mapper and reducers
>>>>>> @Override
>>>>>> public void setup(Context ctx) {
>>>>>>
>>>>>> ubp = new UberRecordProcessor();
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> protected void map(LongWritable key, Text value, Context context)
>>>>>> throws IOException, InterruptedException {
>>>>>> try {
>>>>>> handleLineinMap(value);
>>>>>> if(ub!=null){
>>>>>> context.write(new Text(ub.getAuctionId().toString()),
>>>>>> ub);
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapper").increment(1);
>>>>>> }else{
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>> }
>>>>>> } catch (Exception e) {
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapperError").increment(1);
>>>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>>>
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> public class myAdReducer extends
>>>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>>> AvroValue<UberRecord>> {
>>>>>>
>>>>>> private static Logger logger =
>>>>>> Logger.getLogger(myAdReducer.class);
>>>>>> public static final String BASE_OUTPUT_FOLDER =
>>>>>> "base.output.folder";
>>>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>>> outputs;
>>>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>>>> private String baseOutputPath;
>>>>>> private long reduceAttemptUniqueIdentifier =
>>>>>> System.currentTimeMillis();
>>>>>>
>>>>>> // 2015-02-01T18:00:25.673Z
>>>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>>
>>>>>> @Override
>>>>>> protected void setup(Context context) throws IOException,
>>>>>> InterruptedException {
>>>>>>
>>>>>> amos = new AvroMultipleOutputs(context);
>>>>>> baseOutputPath =
>>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>>>> Context context)
>>>>>> throws IOException, InterruptedException {
>>>>>>
>>>>>> try {
>>>>>> UberRecord ub = new UberRecord();
>>>>>> for (UberRecord ubi : values) {
>>>>>> // enrich
>>>>>> if (ubi.getExchange() == null) {
>>>>>> continue;
>>>>>> }
>>>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>> .getEnricher(ubi.getExchange().toString());
>>>>>> enc.enrich(ubi);
>>>>>> ub = mergeUB(ub, ubi);
>>>>>> }
>>>>>> logger.info("Writing UberRecord [" + ub.toString() +
>>>>>> "]");
>>>>>> String partition = getPartition(ub);
>>>>>>
>>>>>> // context.write();
>>>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>> new AvroValue<UberRecord>(ub), baseOutputPath +
>>>>>> "/"
>>>>>> + partition + "/p" +
>>>>>> reduceAttemptUniqueIdentifier);
>>>>>> } catch (Exception e) {
>>>>>> // TODO Auto-generated catch block
>>>>>> e.printStackTrace();
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>> List<Field> engFields =
>>>>>> EngageData.getClassSchema().getFields();
>>>>>> for (Field field : fields) {
>>>>>> if (field.name().equals("engageData")
>>>>>> && dest.getEngageData() != null) {
>>>>>> EngageData mergedEng = dest.getEngageData();
>>>>>> for (Field engField : engFields) {
>>>>>> if (dest.getEngageData().get(engField.name()) ==
>>>>>> null) {
>>>>>> mergedEng.put(engField.name(),
>>>>>>
>>>>>> src.getEngageData().get(engField.name()));
>>>>>> }
>>>>>>
>>>>>> }
>>>>>> dest.setEngageData(mergedEng);
>>>>>> } else {
>>>>>> if (dest.get(field.name()) == null) {
>>>>>> dest.put(field.name(), src.get(field.name()));
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>> return dest;
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>