Hi Barrie, Inlined below
On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <[email protected]> wrote: > Hi, > > I’m stuck trying to read an avro KeyValuePair datafile with crunch. > > This is a header dump of my avro data file: > > {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A > key/value pair","fields":[{"name":"key","type":"string","doc":"The > key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record", > etc etc > > I’m only interested in the LiveTrackingLine object but I probably need > to read the whole KeyValuePair object and extract the LiveTrackingLine > in the crunch pipeline. > > This is the code I have so far. > > String inputPath = args[0]; > String outputPath = args[1]; > Pipeline pipeline = new MRPipeline(ExtractViewsJob.class, > ExtractViewsJob.class.getSimpleName(), getConf()); > PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines = > pipeline.read(new AvroFileSource<AvroWrapper<Pair<String, > LiveTrackingLine>>>(inputPath)); > > I’m a bit lost in the last part where I configure the 'pipeline' > object with the right avro schema(s) and input dir. The easiest basic case here if you just want to read the data is to do it as follows: PCollection<GenericData.Record> lines = pipeline.read(From.avroFile(inputPath)); However, as you can see, this will give you a PCollection of GenericData.Record objects. > Because my schema is very complex I > want to parse this as a ‘specific’ and not as a ‘generic’ or > ‘reflective’ avro representation, this is also a learning experience > in using avro with crunch. This is not really currently officially supported in Crunch (although there is a way to work around it). Because your top-level record type is a generic record, you need to kind of skip around some of the basic convenience methods of Crunch to get your specific records read as actual specific records. The workaround you can use it to do something like this to read in your PCollection: Schema specificRecordSchema = ...; AvroType<GenericData.Record> customAvroType = new AvroType<GenericData.Record>( GenericData.Record.class, specificRecordSchema, NoOpDeepCopier.<GenericData.Record>create(), Avros.specifics(MySpecificRecord.class)); PCollection<GenericData.Record> pcollection = pipeline.read(From.avroFile(inputPath, customAvroType)); This will give you a PCollection of the GenericRecords that contain instances of your specific class. However, be warned that this is making use of what could be considered non-public APIs that could change in the future. Is this (reading Avro files that have been serialized using AvroKeyValue) a case that you will be using regularly? It might be interesting to add this as built-in functionality in Crunch so you don't need to mess around with creating your own AvroType. - Gabriel
