My problem:
I have an input file which is avro schema but it has shuffled datums(think
ids in mixed order)
I need to sort them by items from the schema (id) and run a
mux-demux/shuffle-sort.
So my mapper: reads from avro schema (GenericRecord) and outputs key(id) and
value(GenericRecord).
My reducer: for each key (id) gets the list of values and outputs to a file
(part-r-00000) just the genericrecords.
My expectation is that I can use the same input schema to read the output
file. But alas this is not working.
In the part-r-00000 I have a 0<tab>Obj<Avroschema>....datums...... Why is
this?
Also how can rename the reduce output file to something other than
part-r-0000*?
Some snippets of code:
================
public void map(GenericData.Record datum,
AvroCollector<Pair<LogKeyWritable,
GenericData.Record>> collector,
Reporter reporter)
throws IOException
{
long tstamp = ((Long) datum.get("timestamp")).longValue();
String keyPath = CollectorUtils.getKeyHour(tstamp,
((String) datum.get("appid")));
LogKeyWritable key = new LogKeyWritable(keyPath, tstamp);
Pair<LogKeyWritable, GenericData.Record> pair = new
Pair<LogKeyWritable,
GenericData.Record>(
key, datum);
collector.collect(pair);
}
public void reduce(LogKeyWritable key, Iterable<GenericData.Record> values,
AvroCollector<GenericData.Record> collector, Reporter
reporter) throws
IOException
{
for (GenericData.Record r : values)
{
collector.collect(r);
}
}
My job setup:
=========
AvroJob.setInputSchema(jobConf, AVRO_SCHEMA);
AvroJob.setOutputSchema(jobConf, AVRO_SCHEMA);
CAN SOMEONE PLEASE HELP!
Nikhil
--
View this message in context:
http://apache-avro.679487.n3.nabble.com/Avro-Map-Reduce-Question-GenericRecord-renaming-reduce-output-tp4025105.html
Sent from the Avro - Users mailing list archive at Nabble.com.