I've often found the issue behind such an observance to be that the input files lack an .avro extension. Is that true in your case? Can you retry after a rename if yes?
On Wed, Jul 31, 2013 at 1:02 AM, Anna Lahoud <[email protected]> wrote: > I am following directions on > http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/package-summary.html > to write a job that takes Avro files as input and outputs non-Avro files, I > created the following job. I should note that I have tried different > variations of ordering the setInput/OutputPath lines, the AvroJob lines, and > the reduce task settings. It always results the same: the job runs with 0 > mappers and 1 reducer (which gets no data so is essentially an emtpy > SequenceFile). It always says there are 10 input files so that's not the > issue. There is an @Override statement on my map and my reduce so that's not > the issue. And I believe I have correctly followed the Avro input/non-Avro > output instructions mentioned in the link above. Any other ideas would be > welcome!!! > > > public class MyAvroJob extends Configured implements Tool { > > @Override > public int run(String[] args) throws Exception { > > JobConf job = new JobConf(getConf(), this.getClass()); > > FileInputFormat.setInputPaths(job, new Path(args[0])); > FileOutputFormat.setOutputPath(job, new Path(args[1])); > > AvroJob.setMapperClass(job, MyAvroMapper.class); > AvroJob.setInputSchema(job, MySchema.SCHEMA$); > AvroJob.setMapOutputSchema(job, > Pair.getPairSchema(Schema.create(Type.STRING), Schema.create(Type.STRING))); > > job.setReducerClass(MyNonAvroReducer.class); > job.setOutputFormat(SequenceFileOutputFormat.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(Text.class); > job.setNumReduceTasks(1); > > return JobClient.runJob(job).isSuccessful(); > } > > public static class MyAvroMapper extends AvroMapper<MySchema, Pair<String, > String>> { > > @Override > public void map(MySchema in, AvroCollector<Pair<String, String>> collector, > Reporter reporter) throws IOException { > > List<MyThings> things = in.getRecords(); > ... > collector.collect(new Pair<String, String>( newKey, newValue)); > } > } > > public static class MyNonAvroReducer extends MapReduceBase implements > Reducer<AvroKey<String>, AvroValue<String>, Text, Text> { > > @Override > public void reduce(AvroKey<String> key, Iterator<AvroValue<String>> values, > OutputCollector<Text, Text> output, Reporter reporter) throws IOException { > while (values.hasNext()) { > output.collect(new Text(key.datum()), new Text(values.next().datum())); > } > } > } > > public static void main(String[] args) throws Exception { > ToolRunner.run(new MyAvroJob(), args); > > } > > > > > -Anna > > > > > -- Harsh J
