I've added a Jira ticket (https://issues.apache.org/jira/browse/CRUNCH-433) to get better compatibility for this kind of thing directly in Crunch.
On Fri, Jul 4, 2014 at 5:14 PM, Gabriel Reid <[email protected]> wrote: > Hi Barrie, > > It's due to a dumb error on my part, sorry about that. The schema that > you pass in to the AvroType constructor needs to be the schema for the > AvroKeyValue class. > > So instead of: > > Schema specificRecordSchema = LiveTrackingLine.SCHEMA$; > > it should be: > > Schema schema = > AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING), > LiveTrackingLine.SCHEMA$); > > Supplying that schema to the AvroType constructor should get you > around the error you're running into now. > > - Gabriel > > > On Fri, Jul 4, 2014 at 4:30 PM, B Kersbergen <[email protected]> wrote: >> Hi Gabriel, >> >> Yes all our avro data is wrapped in the Avro KeyValuePair because >> map-reduce jobs like to reason on keys and values. >> >> This is now my code: >> String inputPath = args[0]; >> String outputPath = args[1]; >> >> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class, >> ExtractViewsJob.class.getSimpleName(), getConf()); >> >> Schema specificRecordSchema = LiveTrackingLine.SCHEMA$; >> AvroType<GenericData.Record> customAvroType = new >> AvroType<GenericData.Record>( >> GenericData.Record.class, specificRecordSchema, >> NoOpDeepCopier.<GenericData.Record>create(), >> Avros.specifics(LiveTrackingLine.class)); >> PCollection<GenericData.Record> pcollection = >> pipeline.read(From.avroFile(inputPath, customAvroType)); >> >> pipeline.write(pcollection, new TextFileTarget(outputPath)); >> return pipeline.done().succeeded() ? 0 : 1; >> >> when running this I get the following job error: >> >> org.apache.avro.AvroTypeException: Found >> org.apache.avro.mapreduce.KeyValuePair, expecting >> com.bol.hadoop.enrich.record.LiveTrackingLine >> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) >> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >> at >> org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) >> >> (I'm using crunch-core 0.10.0-hadoop2) >> >> Do you have any idea whats going wrong here? >> >> 2014-07-04 15:31 GMT+02:00 Gabriel Reid <[email protected]>: >>> Hi Barrie, >>> >>> Inlined below >>> >>> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <[email protected]> wrote: >>>> Hi, >>>> >>>> I’m stuck trying to read an avro KeyValuePair datafile with crunch. >>>> >>>> This is a header dump of my avro data file: >>>> >>>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A >>>> key/value pair","fields":[{"name":"key","type":"string","doc":"The >>>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record", >>>> etc etc >>>> >>>> I’m only interested in the LiveTrackingLine object but I probably need >>>> to read the whole KeyValuePair object and extract the LiveTrackingLine >>>> in the crunch pipeline. >>>> >>>> This is the code I have so far. >>>> >>>> String inputPath = args[0]; >>>> String outputPath = args[1]; >>>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class, >>>> ExtractViewsJob.class.getSimpleName(), getConf()); >>>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines = >>>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String, >>>> LiveTrackingLine>>>(inputPath)); >>>> >>>> I’m a bit lost in the last part where I configure the 'pipeline' >>>> object with the right avro schema(s) and input dir. >>> >>> The easiest basic case here if you just want to read the data is to do >>> it as follows: >>> >>> PCollection<GenericData.Record> lines = >>> pipeline.read(From.avroFile(inputPath)); >>> >>> However, as you can see, this will give you a PCollection of >>> GenericData.Record objects. >>> >>>> Because my schema is very complex I >>>> want to parse this as a ‘specific’ and not as a ‘generic’ or >>>> ‘reflective’ avro representation, this is also a learning experience >>>> in using avro with crunch. >>> >>> This is not really currently officially supported in Crunch (although >>> there is a way to work around it). Because your top-level record type >>> is a generic record, you need to kind of skip around some of the basic >>> convenience methods of Crunch to get your specific records read as >>> actual >>> specific records. >>> >>> The workaround you can use it to do something like this to read in >>> your PCollection: >>> >>> Schema specificRecordSchema = ...; >>> AvroType<GenericData.Record> customAvroType = new >>> AvroType<GenericData.Record>( >>> GenericData.Record.class, specificRecordSchema, >>> NoOpDeepCopier.<GenericData.Record>create(), >>> Avros.specifics(MySpecificRecord.class)); >>> >>> PCollection<GenericData.Record> pcollection = >>> pipeline.read(From.avroFile(inputPath, customAvroType)); >>> >>> This will give you a PCollection of the GenericRecords that contain >>> instances of your specific class. However, be warned that this is >>> making use of what could be considered non-public APIs that could >>> change in the future. >>> >>> Is this (reading Avro files that have been serialized using >>> AvroKeyValue) a case that you will be using regularly? It might be >>> interesting to add this as built-in functionality in Crunch so you >>> don't need to mess around with creating your own AvroType. >>> >>> - Gabriel
